Skip to content

Commit

Permalink
Merge pull request #2 from platform-lunar/bugfix/properly_support_tas…
Browse files Browse the repository at this point in the history
…k_retries

Add support for proper task retries
  • Loading branch information
saulius authored Oct 27, 2017
2 parents 94730f6 + 553dafe commit 16df568
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 3 deletions.
1 change: 1 addition & 0 deletions sample/plugin.dig
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ _export:
- com.github.platform-lunar:digdag-plugin-livy:0.1.1

+livy_action:
_retry: 2
<<: *spark_defaults
livy>: livy application test
livy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,16 @@ public TaskResult runTask()
}
}

private Integer retryCount()
{
return this.state.params().get("retry_count", int.class, 0);
}

private TaskResult run(String endpoint, LivyBatchRequest submission) throws IOException
{
String applicationName = submission.name().or("unknown");

LivyTaskState submissionState = pollingRetryExecutor(state, STATE_START)
LivyTaskState submissionState = pollingRetryExecutor(state, STATE_START + "_" + retryCount())
.withErrorMessage("Livy job submission failed: %s", applicationName)
.runOnce(LivyTaskState.class, (TaskState state) -> {
logger.info("Submitting Livy job: {}", applicationName);
Expand Down Expand Up @@ -141,7 +146,7 @@ private TaskResult run(String endpoint, LivyBatchRequest submission) throws IOEx
}
);

LivyTaskState executionState = pollingWaiter(state, STATE_RUNNING)
LivyTaskState executionState = pollingWaiter(state, STATE_RUNNING + "_" + retryCount())
.withPollInterval(DurationInterval.of(Duration.ofSeconds(1), Duration.ofSeconds(10)))
.withWaitMessage("Livy task id %d is still running", submissionState.id())
.awaitOnce(LivyTaskState.class, pollState -> checkTaskCompletion(submissionState.id(), endpoint, pollState));
Expand All @@ -153,7 +158,7 @@ private TaskResult run(String endpoint, LivyBatchRequest submission) throws IOEx

private Optional<LivyTaskState> checkTaskCompletion(Integer jobId, String endpoint, TaskState pollState) throws IOException
{
return pollingRetryExecutor(pollState, STATE_CHECK)
return pollingRetryExecutor(pollState, STATE_CHECK + "_" + retryCount())
.withRetryInterval(DurationInterval.of(Duration.ofSeconds(15), Duration.ofSeconds(15)))
.run(s -> {
Request request = new Request.Builder()
Expand Down

0 comments on commit 16df568

Please sign in to comment.