diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3b220e7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +*~ +/.gradle +/build +.java-version +.digdag + +# git +!.gitkeep +.git + +# intellij idea +*.iml +*.ipr +*.iws +*.idea + +# eclipse +.classpath +.project +.settings +/bin/ +.meghanada/ +.DS_Store diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..f433b1a --- /dev/null +++ b/LICENSE @@ -0,0 +1,177 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS diff --git a/README.md b/README.md new file mode 100644 index 0000000..89f0aa0 --- /dev/null +++ b/README.md @@ -0,0 +1,43 @@ +# digdag-plugin-livy + +## Description +digdag-plugin-livy is a plugin for submitting [Apache Sark](http://spark.apache.org/) jobs to +the [Apache Livy](https://livy.incubator.apache.org/). + +## Requirements + +- [Digdag](https://www.digdag.io/) +- [Apache Livy](https://livy.incubator.apache.org/) + +## Usage + +```yaml +_export: + plugin: + repositories: + - https://jitpack.io + dependencies: + - com.github.platform-lunar:digdag-plugin-livy:0.1.0 + # Set livy host + livy: + host: http://livy.cluster.internal + port: 8998 + ++livy_step: + livy>: Spark application + name: Spark application name + class_name: com.example.some.ClassName + driver_memory: 1024mb + conf: + spark.yarn.appMasterEnv.PARAM: example +``` + +Submission example: + +``` +digdag run --project sample plugin.dig -p spark_file=s3:///.jar -p spark_class=some.class.Class -p livy_host= -p repos=`pwd`/build/repo --rerun +``` + +## License + +[Apache License 2.0](LICENSE) diff --git a/build.gradle b/build.gradle new file mode 100644 index 0000000..e0d2c23 --- /dev/null +++ b/build.gradle @@ -0,0 +1,79 @@ +plugins { + id 'net.ltgt.apt' version '0.9' +} + +apply plugin: 'java' +apply plugin: 'maven' +apply plugin: 'maven-publish' +apply plugin: 'net.ltgt.apt' + +group = 'com.github.platform-lunar' +version = '0.1.0' + +def digdagVersion = '0.9.13' + +repositories { + mavenCentral() + jcenter() + maven { + url "http://dl.bintray.com/digdag/maven" + } + maven { + url 'https://jitpack.io' + } +} + +configurations { + provided +} + +dependencies { + provided 'io.digdag:digdag-spi:' + digdagVersion + provided 'io.digdag:digdag-standards:' + digdagVersion + provided 'io.digdag:digdag-plugin-utils:' + digdagVersion // this should be 'compile' once digdag 0.8.2 is released to a built-in repository + compile 'com.squareup.okhttp3:okhttp:3.8.1' + compileOnly 'com.fasterxml.jackson.core:jackson-annotations:2.6.7' + compileOnly 'com.fasterxml.jackson.datatype:jackson-datatype-guava:2.6.7' + + compileOnly 'org.immutables:value:2.5.5:annotations' + apt 'org.immutables:value:2.5.5' + apt 'com.fasterxml.jackson.core:jackson-annotations:2.6.7' +} + +sourceSets { + main { + compileClasspath += configurations.provided + test.compileClasspath += configurations.provided + test.runtimeClasspath += configurations.provided + } +} + +publishing { + publications { + mavenJava(MavenPublication) { + from components.java + } + } + repositories { + maven { + url "$buildDir/repo" + } + } +} + +sourceCompatibility = 1.8 +targetCompatibility = 1.8 + +compileJava.options.encoding = 'UTF-8' +compileTestJava.options.encoding = 'UTF-8' + +tasks.withType(JavaCompile) { + options.compilerArgs << "-Xlint:unchecked" << "-Xlint:deprecation" +} + +javadoc { + options { + locale = 'en_US' + encoding = 'UTF-8' + } +} diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..0ac710e Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..1087e39 --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Sat Jul 29 20:45:22 JST 2017 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.0.1-bin.zip diff --git a/gradlew b/gradlew new file mode 100755 index 0000000..cccdd3d --- /dev/null +++ b/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 0000000..e95643d --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/sample/.gitignore b/sample/.gitignore new file mode 100644 index 0000000..10476a6 --- /dev/null +++ b/sample/.gitignore @@ -0,0 +1,5 @@ +/.digdag-wrapper +.digdag +*.pyc + +example.out diff --git a/sample/plugin.dig b/sample/plugin.dig new file mode 100644 index 0000000..21118e9 --- /dev/null +++ b/sample/plugin.dig @@ -0,0 +1,25 @@ +_export: + spark_defaults: &spark_defaults + executor_cores: 2 + num_executors: 3 + + plugin: + repositories: + # - file://${repos} + - https://jitpack.io + dependencies: + - com.github.platform-lunar:digdag-plugin-livy:0.1.0 + ++livy_action: + <<: *spark_defaults + livy>: livy application test + host: ${livy_host} + port: 8998 + file: ${spark_file} + class_name: ${spark_class} + driver_memory: 1024mb + name: Test livy application + jars: + - ${spark_file} + conf: + spark.yarn.appMasterEnv.TEST: testing diff --git a/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyBatchRequest.java b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyBatchRequest.java new file mode 100644 index 0000000..e4a6218 --- /dev/null +++ b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyBatchRequest.java @@ -0,0 +1,67 @@ +package com.github.platformlunar.digdag.plugin.livy; + +import com.google.common.base.Optional; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; + +import org.immutables.value.Value; + +import java.util.Map; +import java.util.List; + +@Value.Immutable +@JsonDeserialize(as = ImmutableLivyBatchRequest.class) +@JsonInclude(Include.NON_EMPTY) +interface LivyBatchRequest +{ + @JsonProperty("file") + String file(); + + @JsonProperty("proxyUser") + Optional proxyUser(); + + @JsonProperty("className") + Optional className(); + + @JsonProperty("args") + Optional> args(); + + @JsonProperty("jars") + Optional> jars(); + + @JsonProperty("pyFiles") + Optional> pyFiles(); + + @JsonProperty("files") + Optional> files(); + + @JsonProperty("driverMemory") + Optional driverMemory(); + + @JsonProperty("driverCores") + Optional driverCores(); + + @JsonProperty("executorMemory") + Optional executorMemory(); + + @JsonProperty("executorCores") + Optional executorCores(); + + @JsonProperty("numExecutors") + Optional numExecutors(); + + @JsonProperty("archives") + Optional> archives(); + + @JsonProperty("queue") + Optional queue(); + + @JsonProperty("name") + Optional name(); + + @JsonProperty("conf") + Optional> conf(); +} diff --git a/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyHttpConfig.java b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyHttpConfig.java new file mode 100644 index 0000000..0893179 --- /dev/null +++ b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyHttpConfig.java @@ -0,0 +1,16 @@ +package com.github.platformlunar.digdag.plugin.livy; + +import com.google.common.base.Optional; + +import org.immutables.value.Value; + +@Value.Immutable +interface LivyHttpConfig +{ + // HTTP connection essentials + String host(); + Optional port(); + Optional https(); + Optional username(); + Optional password(); +} diff --git a/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyOperator.java b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyOperator.java new file mode 100644 index 0000000..304b70d --- /dev/null +++ b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyOperator.java @@ -0,0 +1,243 @@ +package com.github.platformlunar.digdag.plugin.livy; + +import com.google.common.base.Optional; +import com.google.common.base.Throwables; + +import io.digdag.client.config.Config; +import io.digdag.spi.TaskResult; +import io.digdag.spi.OperatorContext; +import io.digdag.spi.SecretProvider; +import io.digdag.spi.TaskExecutionException; +import io.digdag.util.BaseOperator; +import io.digdag.standards.operator.state.TaskState; +import io.digdag.standards.operator.DurationInterval; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.datatype.guava.GuavaModule; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.core.type.TypeReference; + + +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.List; + +import static io.digdag.standards.operator.state.PollingRetryExecutor.pollingRetryExecutor; +import static io.digdag.standards.operator.state.PollingWaiter.pollingWaiter; + +public class LivyOperator extends BaseOperator +{ + private final TaskState state; + + private static Logger logger = LoggerFactory.getLogger(LivyOperator.class); + + private final OkHttpClient httpClient = new OkHttpClient(); + + private final Optional systemLivyHttpConfig; + + private static final ObjectMapper objectMapper = new ObjectMapper(); + static { + objectMapper.registerModule(new GuavaModule()); + } + + private static final MediaType JSON_MEDIA_TYPE = MediaType.parse("application/json"); + private static final String JOB_ID = "jobId"; + private static final String STATE_START = "start"; + private static final String STATE_RUNNING = "running"; + private static final String STATE_CHECK = "check"; + + public LivyOperator(OperatorContext context) + { + super(context); + this.state = TaskState.of(request); + this.systemLivyHttpConfig = systemLivyHttpConfig(request.getConfig()); + } + + @Override + public TaskResult runTask() + { + SecretProvider secrets = context.getSecrets().getSecrets("livy"); + + Config params = request.getConfig().mergeDefault( + request.getConfig().getNestedOrGetEmpty("livy")); + + // Set Livy HTTP endpoint config + LivyHttpConfig livyConf = userLivyHttpConfig(secrets, params) + .or(systemLivyHttpConfig) + .orNull(); + + // Set Livy batch session request (task) config + LivyBatchRequest batchSubmission = userBatchRequestConfig(secrets, params); + + String scheme = livyConf.https().isPresent() && livyConf.https().get() ? "https" : "http"; + String userinfo = ""; + + if (livyConf.username().isPresent() && livyConf.password().isPresent()) { + userinfo = livyConf.username().get() + ":" + livyConf.password().get() + "@"; + } + + String endpoint = scheme + "://" + userinfo + livyConf.host() + ":" + livyConf.port().get() + "/batches"; + + try { + return run(endpoint, batchSubmission); + } catch (Throwable t) { + boolean retry = t instanceof TaskExecutionException && + ((TaskExecutionException) t).getRetryInterval().isPresent(); + throw Throwables.propagate(t); + } + } + + private TaskResult run(String endpoint, LivyBatchRequest submission) throws IOException + { + String applicationName = submission.name().or("unknown"); + + LivyTaskState submissionState = pollingRetryExecutor(state, STATE_START) + .withErrorMessage("Livy job submission failed: %s", applicationName) + .runOnce(LivyTaskState.class, (TaskState state) -> { + logger.info("Submitting Livy job: {}", applicationName); + + try { + String requestBody = objectMapper.writeValueAsString(submission); + + RequestBody jsonRequestBody = RequestBody.create(JSON_MEDIA_TYPE, requestBody); + + Request submissionRequest = new Request.Builder() + .url(endpoint) + .post(jsonRequestBody) + .build(); + + Response response = httpClient.newCall(submissionRequest).execute(); + + String responseBody = response.body().string(); + + LivyTaskState currentState = objectMapper.readValue(responseBody, ImmutableLivyTaskState.class); + + logger.info("Successfully submitted Livy application id: {}", currentState.id()); + + return currentState; + } catch (JsonProcessingException ex) { + throw new TaskExecutionException(ex); + } catch (IOException ex) { + throw new TaskExecutionException(ex); + } + } + ); + + LivyTaskState executionState = pollingWaiter(state, STATE_RUNNING) + .withPollInterval(DurationInterval.of(Duration.ofSeconds(1), Duration.ofSeconds(10))) + .withWaitMessage("Livy task id %d is still running", submissionState.id()) + .awaitOnce(LivyTaskState.class, pollState -> checkTaskCompletion(submissionState.id(), endpoint, pollState)); + + logger.info("Livy application id {} ended with status {}", executionState.id(), executionState.state()); + + return TaskResult.defaultBuilder(request).build(); + } + + private Optional checkTaskCompletion(Integer jobId, String endpoint, TaskState pollState) throws IOException + { + return pollingRetryExecutor(pollState, STATE_CHECK) + .withRetryInterval(DurationInterval.of(Duration.ofSeconds(15), Duration.ofSeconds(15))) + .run(s -> { + Request request = new Request.Builder() + .url(endpoint + "/" + jobId) + .build(); + + Response response = httpClient.newCall(request).execute(); + String responseBody = response.body().string(); + ImmutableLivyTaskState currentTaskState = objectMapper.readValue(responseBody, ImmutableLivyTaskState.class); + String currentState = currentTaskState.state(); + + if (currentTaskState.appId().isPresent()) { + logger.info("Livy task id {} ({}) is currently {}", currentTaskState.id(), currentTaskState.appId().get(), currentTaskState.state()); + } else { + logger.info("Livy task id {} is currently {}", currentTaskState.id(), currentTaskState.state()); + } + + switch (currentState) { + case "not_started": + case "starting": + case "recovering": + case "idle": + case "running": + case "busy": + case "shutting_down": + return Optional.absent(); + case "success": + return Optional.of(currentTaskState); + case "error": + case "dead": + throw new TaskExecutionException("Livy task id " + currentTaskState.id() + " finished with status " + currentState); + default: + throw new RuntimeException("Unknown Livy task state: " + currentTaskState); + } + }); + } + + private static Optional systemLivyHttpConfig(Config systemConfig) + { + Optional host = systemConfig.getOptional("config.livy.host", String.class); + if (!host.isPresent()) { + return Optional.absent(); + } + + LivyHttpConfig config = ImmutableLivyHttpConfig.builder() + .host(host.get()) + .port(systemConfig.get("config.livy.port", int.class, 8998)) + .https(systemConfig.get("config.livy.https", boolean.class, false)) + .username(systemConfig.getOptional("config.livy.username", String.class)) + .password(systemConfig.getOptional("config.livy.password", String.class)) + .build(); + + return Optional.of(config); + } + + private static Optional userLivyHttpConfig(SecretProvider secrets, Config params) + { + Optional userHost = secrets.getSecretOptional("host").or(params.getOptional("host", String.class)); + if (!userHost.isPresent()) { + return Optional.absent(); + } + + LivyHttpConfig config = ImmutableLivyHttpConfig.builder() + .host(userHost.get()) + .port(secrets.getSecretOptional("port").transform(Integer::parseInt).or(params.get("port", int.class))) + .https(secrets.getSecretOptional("https").transform(Boolean::parseBoolean).or(params.get("https", boolean.class, false))) + .username(secrets.getSecretOptional("username").or(params.getOptional("username", String.class))) + .password(secrets.getSecretOptional("password")) + .build(); + + return Optional.of(config); + } + + private static LivyBatchRequest userBatchRequestConfig(SecretProvider secrets, Config params) + { + return ImmutableLivyBatchRequest.builder() + .file(params.get("file", String.class)) + .proxyUser(params.getOptional("proxy_user", String.class)) + .className(params.getOptional("class_name", String.class)) + .args(params.getListOrEmpty("args", String.class)) + .jars(params.getListOrEmpty("jars", String.class)) + .pyFiles(params.getListOrEmpty("py_files", String.class)) + .files(params.getListOrEmpty("files", String.class)) + .driverMemory(params.getOptional("driver_memory", String.class)) + .driverCores(params.getOptional("driver_cores", Integer.class)) + .executorMemory(params.getOptional("executor_memory", String.class)) + .executorCores(params.getOptional("executor_cores", Integer.class)) + .numExecutors(params.getOptional("num_executors", Integer.class)) + .archives(params.getListOrEmpty("archives", String.class)) + .queue(params.getOptional("queue", String.class)) + .name(params.getOptional("name", String.class)) + .conf(params.getMapOrEmpty("conf", String.class, String.class)) + .build(); + } +} diff --git a/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyOperatorFactory.java b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyOperatorFactory.java new file mode 100644 index 0000000..f42ba4e --- /dev/null +++ b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyOperatorFactory.java @@ -0,0 +1,30 @@ +package com.github.platformlunar.digdag.plugin.livy; + +import io.digdag.client.config.Config; +import io.digdag.spi.Operator; +import io.digdag.spi.OperatorContext; +import io.digdag.spi.OperatorFactory; +import io.digdag.spi.TemplateEngine; + +public class LivyOperatorFactory implements OperatorFactory +{ + + @SuppressWarnings("unused") + private final TemplateEngine templateEngine; + + public LivyOperatorFactory(TemplateEngine templateEngine, Config systemConfig) + { + this.templateEngine = templateEngine; + } + + public String getType() + { + return "livy"; + } + + @Override + public Operator newOperator(OperatorContext context) + { + return new LivyOperator(context); + } +} diff --git a/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyPlugin.java b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyPlugin.java new file mode 100644 index 0000000..507d9a4 --- /dev/null +++ b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyPlugin.java @@ -0,0 +1,37 @@ +package com.github.platformlunar.digdag.plugin.livy; + +import io.digdag.client.config.Config; + +import io.digdag.spi.OperatorFactory; +import io.digdag.spi.OperatorProvider; +import io.digdag.spi.Plugin; +import io.digdag.spi.TemplateEngine; + +import java.util.Arrays; +import java.util.List; + +import javax.inject.Inject; + +public class LivyPlugin implements Plugin { + @Override + public Class getServiceProvider(Class type) { + if (type == OperatorProvider.class) { + return LivyOperatorProvider.class.asSubclass(type); + } else { + return null; + } + } + + public static class LivyOperatorProvider implements OperatorProvider { + @Inject + protected TemplateEngine templateEngine; + + @Inject + Config systemConfig; + + @Override + public List get() { + return Arrays.asList(new LivyOperatorFactory(templateEngine, systemConfig)); + } + } +} diff --git a/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyTaskState.java b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyTaskState.java new file mode 100644 index 0000000..5cc65c0 --- /dev/null +++ b/src/main/java/com/github/platformlunar/digdag/plugin/livy/LivyTaskState.java @@ -0,0 +1,31 @@ +package com.github.platformlunar.digdag.plugin.livy; + +import com.google.common.base.Optional; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.annotation.JsonProperty; + +import org.immutables.value.Value; + +import java.util.Map; +import java.util.List; + +@Value.Immutable +@JsonDeserialize(as = ImmutableLivyTaskState.class) +interface LivyTaskState +{ + @JsonProperty("id") + Integer id(); + + @JsonProperty("state") + String state(); + + @JsonProperty("appId") + Optional appId(); + + @JsonProperty("appInfo") + Map> appInfo(); + + @JsonProperty("log") + List log(); +} diff --git a/src/main/resources/META-INF/services/io.digdag.spi.Plugin b/src/main/resources/META-INF/services/io.digdag.spi.Plugin new file mode 100644 index 0000000..34a6b5c --- /dev/null +++ b/src/main/resources/META-INF/services/io.digdag.spi.Plugin @@ -0,0 +1,2 @@ +com.github.platformlunar.digdag.plugin.livy.LivyPlugin +