From 5903f25b9af5709571eebd0c7f625322982092ed Mon Sep 17 00:00:00 2001 From: cbqiao Date: Thu, 13 Jul 2023 13:51:49 +0800 Subject: [PATCH] rebase master --- .../geaflow/env/EnvironmentFactory.java | 8 ++-- .../antgroup/geaflow/env/IEnvironment.java | 4 +- .../AbstractClusterManager.java | 4 +- ...tory.java => FailoverStrategyFactory.java} | 6 +-- .../failover/FoStrategyFactoryTest.java | 2 +- .../geaflow-deploy/geaflow-assembly/pom.xml | 9 ++-- .../KubernetesClusterManager.java | 6 +-- .../clustermanager/LocalClusterManager.java | 6 +-- .../pom.xml | 2 +- .../cluster/ray/client/RayClusterClient.java | 4 +- .../cluster/ray/client/RayEnvironment.java | 2 +- .../cluster/ray/clustermanager/RayClient.java | 4 ++ .../ray/clustermanager/RayClusterId.java | 0 .../ray/clustermanager/RayClusterManager.java | 40 ++++++++++++----- .../ray/context/RayContainerContext.java | 0 .../cluster/ray/context/RayDriverContext.java | 0 .../ray/entrypoint/RayContainerRunner.java | 0 .../ray/entrypoint/RayDriverRunner.java | 0 .../ray/entrypoint/RayMasterRunner.java | 2 +- .../failover/AbstractRayFailoverStrategy.java | 12 ++++- .../failover/RayClusterFailoverStrategy.java | 3 +- .../RayComponentFailoverStrategy.java} | 8 ++-- .../failover/RayDisableFailoverStrategy.java | 40 +++++++++++++++++ .../geaflow/cluster/ray/utils/RayConfig.java | 8 ++-- .../cluster/ray/utils/RaySystemFunc.java | 17 +++---- ...geaflow.cluster.failover.IFailoverStrategy | 1 + .../com.antgroup.geaflow.env.IEnvironment | 0 .../failover/RayClusterFoStrategyTest.java | 44 +++++++++++++++++++ .../failover/RayDisableFoStrategyTest.java | 40 +++++++++++++++++ geaflow/geaflow-deploy/pom.xml | 2 +- .../dsl/runtime/engine/GeaFlowGqlClient.java | 4 +- .../example/stream/UnBoundedStreamFoTest.java | 2 +- .../stream/UnBoundedStreamWordPrint.java | 2 +- .../example/stream/WindowStreamWordCount.java | 2 +- .../geaflow/example/util/EnvironmentUtil.java | 4 +- geaflow/pom.xml | 2 +- 36 files changed, 220 insertions(+), 70 deletions(-) rename geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/failover/{FoStrategyFactory.java => FailoverStrategyFactory.java} (90%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/pom.xml (96%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java (95%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java (95%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java (58%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java (100%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java (96%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/failover/AbstractRayFailoverStrategy.java (68%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java (89%) rename geaflow/geaflow-deploy/{geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java => geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java} (75%) create mode 100644 geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RayConfig.java (88%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java (92%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy (66%) rename geaflow/geaflow-deploy/{geaflow-on-ray => geaflow-on-ray-community}/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment (100%) create mode 100644 geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java create mode 100644 geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java diff --git a/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/EnvironmentFactory.java b/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/EnvironmentFactory.java index 7e587e33e..5f082eefc 100644 --- a/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/EnvironmentFactory.java +++ b/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/EnvironmentFactory.java @@ -53,12 +53,12 @@ public static Environment onLocalEnvironment(String[] args) { return environment; } - public static Environment onAntEnvironment() { - return (Environment) loadEnvironment(EnvType.RAY); + public static Environment onRayCommunityEnvironment() { + return (Environment) loadEnvironment(EnvType.RAY_COMMUNITY); } - public static Environment onAntEnvironment(String[] args) { - Environment environment = (Environment) loadEnvironment(EnvType.RAY); + public static Environment onRayCommunityEnvironment(String[] args) { + Environment environment = (Environment) loadEnvironment(EnvType.RAY_COMMUNITY); IEnvironmentArgsParser argsParser = loadEnvironmentArgsParser(); environment.getEnvironmentContext().withConfig(argsParser.parse(args)); return environment; diff --git a/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/IEnvironment.java b/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/IEnvironment.java index 9cadbc4ba..d4b1d979a 100644 --- a/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/IEnvironment.java +++ b/geaflow/geaflow-core/geaflow-api/src/main/java/com/antgroup/geaflow/env/IEnvironment.java @@ -42,9 +42,9 @@ public interface IEnvironment extends Serializable { enum EnvType { /** - * Ray cluster. + * Community ray cluster. */ - RAY, + RAY_COMMUNITY, /** * K8s cluster. diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java index abce1404e..632714c0a 100644 --- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java +++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/clustermanager/AbstractClusterManager.java @@ -74,7 +74,7 @@ public void init(ClusterContext clusterContext) { this.driverFuture = new CompletableFuture<>(); this.containerIds = new HashSet<>(); this.driverIds = new HashSet<>(); - this.foStrategy = buildFoStrategy(); + this.foStrategy = buildFailoverStrategy(); Preconditions.checkNotNull(masterId, "masterId is not set"); } @@ -82,7 +82,7 @@ public ClusterContext getClusterContext() { return clusterContext; } - protected abstract IFailoverStrategy buildFoStrategy(); + protected abstract IFailoverStrategy buildFailoverStrategy(); @Override public void allocateWorkers(int workerNum) { diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactory.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/failover/FailoverStrategyFactory.java similarity index 90% rename from geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactory.java rename to geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/failover/FailoverStrategyFactory.java index 15ec0c011..7eb8ac008 100644 --- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactory.java +++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/main/java/com/antgroup/geaflow/cluster/failover/FailoverStrategyFactory.java @@ -22,11 +22,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FoStrategyFactory { +public class FailoverStrategyFactory { - private static final Logger LOGGER = LoggerFactory.getLogger(FoStrategyFactory.class); + private static final Logger LOGGER = LoggerFactory.getLogger(FailoverStrategyFactory.class); - public static IFailoverStrategy loadFoStrategy(EnvType envType, String foStrategyType) { + public static IFailoverStrategy loadFailoverStrategy(EnvType envType, String foStrategyType) { ServiceLoader contextLoader = ServiceLoader.load(IFailoverStrategy.class); Iterator contextIterable = contextLoader.iterator(); while (contextIterable.hasNext()) { diff --git a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactoryTest.java b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactoryTest.java index 1f4b0eac3..89333f08d 100644 --- a/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactoryTest.java +++ b/geaflow/geaflow-core/geaflow-engine/geaflow-cluster/src/test/java/com/antgroup/geaflow/cluster/failover/FoStrategyFactoryTest.java @@ -22,7 +22,7 @@ public class FoStrategyFactoryTest { @Test(expectedExceptions = GeaflowRuntimeException.class) public void testLoad() { - FoStrategyFactory.loadFoStrategy(EnvType.RAY, ""); + FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, ""); } } diff --git a/geaflow/geaflow-deploy/geaflow-assembly/pom.xml b/geaflow/geaflow-deploy/geaflow-assembly/pom.xml index 58b96e74b..7f2ef546a 100644 --- a/geaflow/geaflow-deploy/geaflow-assembly/pom.xml +++ b/geaflow/geaflow-deploy/geaflow-assembly/pom.xml @@ -26,6 +26,7 @@ geaflow-assembly + org.apache.httpcomponents httpclient @@ -56,10 +57,6 @@ com.antgroup.tugraph geaflow-on-k8s - - com.antgroup.tugraph - geaflow-on-ray - com.antgroup.tugraph geaflow-examples @@ -72,6 +69,10 @@ com.antgroup.tugraph geaflow-dsl-runtime + + com.antgroup.tugraph + geaflow-on-ray-community + diff --git a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/clustermanager/KubernetesClusterManager.java b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/clustermanager/KubernetesClusterManager.java index e59ba9720..9b4fa227f 100644 --- a/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/clustermanager/KubernetesClusterManager.java +++ b/geaflow/geaflow-deploy/geaflow-on-k8s/src/main/java/com/antgroup/geaflow/cluster/k8s/clustermanager/KubernetesClusterManager.java @@ -30,7 +30,7 @@ import com.antgroup.geaflow.cluster.config.ClusterConfig; import com.antgroup.geaflow.cluster.container.ContainerInfo; import com.antgroup.geaflow.cluster.driver.DriverInfo; -import com.antgroup.geaflow.cluster.failover.FoStrategyFactory; +import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory; import com.antgroup.geaflow.cluster.failover.IFailoverStrategy; import com.antgroup.geaflow.cluster.k8s.config.AbstractKubernetesParam; import com.antgroup.geaflow.cluster.k8s.config.KubernetesConfig; @@ -118,8 +118,8 @@ public void init(ClusterContext context, GeaflowKubeClient kubernetesClient) { } @Override - protected IFailoverStrategy buildFoStrategy() { - IFailoverStrategy foStrategy = FoStrategyFactory.loadFoStrategy(EnvType.K8S, + protected IFailoverStrategy buildFailoverStrategy() { + IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.K8S, super.clusterConfig.getConfig().getString(FO_STRATEGY)); foStrategy.init(clusterContext); ((AbstractKubernetesFailoverStrategy) foStrategy).setClusterManager(this); diff --git a/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java b/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java index dbe1d54f2..14b54c192 100644 --- a/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java +++ b/geaflow/geaflow-deploy/geaflow-on-local/src/main/java/com/antgroup/geaflow/cluster/local/clustermanager/LocalClusterManager.java @@ -18,8 +18,8 @@ import com.antgroup.geaflow.cluster.clustermanager.ClusterContext; import com.antgroup.geaflow.cluster.container.ContainerContext; import com.antgroup.geaflow.cluster.driver.DriverContext; +import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory; import com.antgroup.geaflow.cluster.failover.FailoverStrategyType; -import com.antgroup.geaflow.cluster.failover.FoStrategyFactory; import com.antgroup.geaflow.cluster.failover.IFailoverStrategy; import com.antgroup.geaflow.cluster.local.context.LocalContainerContext; import com.antgroup.geaflow.cluster.local.context.LocalDriverContext; @@ -43,8 +43,8 @@ public void init(ClusterContext clusterContext) { } @Override - protected IFailoverStrategy buildFoStrategy() { - return FoStrategyFactory.loadFoStrategy(IEnvironment.EnvType.LOCAL, FailoverStrategyType.disable_fo.name()); + protected IFailoverStrategy buildFailoverStrategy() { + return FailoverStrategyFactory.loadFailoverStrategy(IEnvironment.EnvType.LOCAL, FailoverStrategyType.disable_fo.name()); } @Override diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/pom.xml b/geaflow/geaflow-deploy/geaflow-on-ray-community/pom.xml similarity index 96% rename from geaflow/geaflow-deploy/geaflow-on-ray/pom.xml rename to geaflow/geaflow-deploy/geaflow-on-ray-community/pom.xml index 0b8feb833..623d4f0bf 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/pom.xml +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/pom.xml @@ -24,7 +24,7 @@ 4.0.0 - geaflow-on-ray + geaflow-on-ray-community jar diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java index 3cb7f012f..ae894204d 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayClusterClient.java @@ -42,10 +42,10 @@ public class RayClusterClient extends AbstractClusterClient { public void init(IEnvironmentContext environmentContext) { super.init(environmentContext); clusterContext = new ClusterContext(config); - RaySystemFunc.initRayEnv(clusterContext.getClusterConfig()); - config.put(JOB_WORK_PATH, RaySystemFunc.getWorkPath()); rayClusterManager = new RayClusterManager(); rayClusterManager.init(clusterContext); + RaySystemFunc.initRayEnv(clusterContext.getClusterConfig()); + config.put(JOB_WORK_PATH, RaySystemFunc.getWorkPath()); } @Override diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java similarity index 95% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java index 987f80421..5fea656e6 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/client/RayEnvironment.java @@ -26,6 +26,6 @@ protected IClusterClient getClusterClient() { @Override public EnvType getEnvType() { - return EnvType.RAY; + return EnvType.RAY_COMMUNITY; } } diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java similarity index 95% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java index 6574ebad6..2f120df5c 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClient.java @@ -24,6 +24,7 @@ import io.ray.api.ActorHandle; import io.ray.api.ObjectRef; import io.ray.api.Ray; +import io.ray.api.options.ActorLifetime; import java.io.Serializable; import java.util.List; import org.slf4j.Logger; @@ -41,6 +42,7 @@ public static ActorHandle createMaster(ClusterConfig clusterCon ActorHandle masterRayActor = Ray .actor(RayMasterRunner::new, clusterConfig.getConfig()) .setMaxRestarts(clusterConfig.getMaxRestarts()) + .setLifetime(ActorLifetime.DETACHED) .setJvmOptions(jvmOptions).remote(); LOGGER.info("master actor:{}, memoryMB:{}, jvmOptions:{}, isRestartAllFo:{}, foRestartTimes:{}", masterRayActor.getId().toString(), totalMemoryMb, jvmOptions, @@ -63,6 +65,7 @@ public static ActorHandle createDriver(ClusterConfig clusterCon ActorHandle driverRayActor = Ray .actor(RayDriverRunner::new, context) .setMaxRestarts(clusterConfig.getMaxRestarts()) + .setLifetime(ActorLifetime.DETACHED) .setJvmOptions(jvmOptions).remote(); LOGGER.info("driver actor:{}, memoryMB:{}, jvmOptions:{}, isRestartAllFo:{}, foRestartTimes:{}", driverRayActor.getId().toString(), totalMemoryMb, jvmOptions, @@ -75,6 +78,7 @@ public static ActorHandle createContainer(ClusterConfig clus ActorHandle rayContainer = Ray .actor(RayContainerRunner::new, containerContext) .setMaxRestarts(clusterConfig.getMaxRestarts()) + .setLifetime(ActorLifetime.DETACHED) .remote(); LOGGER.info("worker actor {} isDisableFo {}", rayContainer.getId().toString(), clusterConfig.isFoEnable()); diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterId.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java similarity index 58% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java index 5c67bbc98..948435e2b 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/clustermanager/RayClusterManager.java @@ -20,14 +20,22 @@ import com.antgroup.geaflow.cluster.clustermanager.ClusterContext; import com.antgroup.geaflow.cluster.container.ContainerContext; import com.antgroup.geaflow.cluster.driver.DriverContext; -import com.antgroup.geaflow.cluster.failover.FoStrategyFactory; +import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory; import com.antgroup.geaflow.cluster.failover.IFailoverStrategy; import com.antgroup.geaflow.cluster.ray.context.RayContainerContext; import com.antgroup.geaflow.cluster.ray.context.RayDriverContext; +import com.antgroup.geaflow.cluster.ray.entrypoint.RayContainerRunner; +import com.antgroup.geaflow.cluster.ray.entrypoint.RayDriverRunner; +import com.antgroup.geaflow.cluster.ray.entrypoint.RayMasterRunner; +import com.antgroup.geaflow.cluster.ray.failover.AbstractRayFailoverStrategy; import com.antgroup.geaflow.cluster.ray.utils.RaySystemFunc; +import com.antgroup.geaflow.common.exception.GeaflowRuntimeException; import com.antgroup.geaflow.env.IEnvironment.EnvType; import com.google.common.base.Preconditions; +import io.ray.api.ActorHandle; import java.io.File; +import java.util.HashMap; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,50 +43,62 @@ public class RayClusterManager extends AbstractClusterManager { private static final Logger LOGGER = LoggerFactory.getLogger(RayClusterManager.class); - private String appPath; + private static final int MASTER_ACTOR_ID = 0; + private static Map actors = new HashMap<>(); @Override public void init(ClusterContext clusterContext) { super.init(clusterContext); - this.appPath = RaySystemFunc.getWorkPath(); } @Override - protected IFailoverStrategy buildFoStrategy() { - return FoStrategyFactory.loadFoStrategy(EnvType.RAY, clusterConfig.getConfig().getString(FO_STRATEGY)); + protected IFailoverStrategy buildFailoverStrategy() { + IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy( + EnvType.RAY_COMMUNITY, clusterConfig.getConfig().getString(FO_STRATEGY)); + foStrategy.init(clusterContext); + ((AbstractRayFailoverStrategy) foStrategy).setClusterManager(this); + return foStrategy; } @Override public RayClusterId startMaster() { Preconditions.checkArgument(clusterConfig != null, "clusterConfig is not initialized"); - clusterInfo = new RayClusterId(RayClient.createMaster(clusterConfig)); + ActorHandle master = RayClient.createMaster(clusterConfig); + clusterInfo = new RayClusterId(master); + actors.put(MASTER_ACTOR_ID, master); return (RayClusterId) clusterInfo; } @Override public void restartContainer(int containerId) { - // do nothing. + if (!actors.containsKey(containerId)) { + throw new GeaflowRuntimeException(String.format("invalid container id %s", containerId)); + } + actors.get(containerId).kill(); } @Override public void doStartContainer(int containerId, boolean isRecover) { ContainerContext containerContext = new RayContainerContext(containerId, clusterConfig.getConfig()); - RayClient.createContainer(clusterConfig, containerContext); + ActorHandle container = RayClient.createContainer(clusterConfig, containerContext); + actors.put(containerId, container); } @Override public void doStartDriver(int driverId) { DriverContext driverContext = new RayDriverContext(driverId, clusterConfig.getConfig()); - RayClient.createDriver(clusterConfig, driverContext); + ActorHandle driver = RayClient.createDriver(clusterConfig, driverContext); + actors.put(driverId, driver); LOGGER.info("call driver start"); } @Override public void close() { super.close(); + actors.clear(); if (RaySystemFunc.isLocalMode()) { - FileUtils.deleteQuietly(new File(appPath)); + FileUtils.deleteQuietly(new File(RaySystemFunc.getWorkPath())); } } diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayContainerContext.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/context/RayDriverContext.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayContainerRunner.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayDriverRunner.java diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java similarity index 96% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java index 3bc0fecc8..ebcf82d60 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/entrypoint/RayMasterRunner.java @@ -31,7 +31,7 @@ public RayMasterRunner(Configuration configuration) { MasterContext context = new MasterContext(configuration); context.setRecover(RaySystemFunc.isRestarted()); context.setClusterManager(new RayClusterManager()); - context.setEnvType(EnvType.RAY); + context.setEnvType(EnvType.RAY_COMMUNITY); master.init(context); } diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/AbstractRayFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/AbstractRayFailoverStrategy.java similarity index 68% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/AbstractRayFailoverStrategy.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/AbstractRayFailoverStrategy.java index 6523a0c77..a45e4e6bc 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/AbstractRayFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/AbstractRayFailoverStrategy.java @@ -15,13 +15,23 @@ package com.antgroup.geaflow.cluster.ray.failover; import com.antgroup.geaflow.cluster.failover.IFailoverStrategy; +import com.antgroup.geaflow.cluster.ray.clustermanager.RayClusterManager; import com.antgroup.geaflow.env.IEnvironment.EnvType; public abstract class AbstractRayFailoverStrategy implements IFailoverStrategy { + protected RayClusterManager clusterManager; @Override public EnvType getEnv() { - return EnvType.RAY; + return EnvType.RAY_COMMUNITY; + } + + public RayClusterManager getClusterManager() { + return clusterManager; + } + + public void setClusterManager(RayClusterManager clusterManager) { + this.clusterManager = clusterManager; } } diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java similarity index 89% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java index 74434f03b..de62ae210 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFailoverStrategy.java @@ -22,8 +22,7 @@ public class RayClusterFailoverStrategy extends AbstractRayFailoverStrategy { @Override public void init(ClusterContext context) { - // TODO set before ray init. - System.setProperty(RayConfig.RAY_JOB_L1FO_ENABLE, String.valueOf(Boolean.TRUE)); + System.setProperty(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString()); } @Override diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java similarity index 75% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java index 66b433d20..b6f3dfe8d 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayComponentFailoverStrategy.java @@ -16,21 +16,21 @@ import com.antgroup.geaflow.cluster.clustermanager.ClusterContext; import com.antgroup.geaflow.cluster.failover.FailoverStrategyType; +import com.antgroup.geaflow.cluster.ray.utils.RayConfig; -public class RayDisableFailoverStrategy extends AbstractRayFailoverStrategy { +public class RayComponentFailoverStrategy extends AbstractRayFailoverStrategy { @Override public void init(ClusterContext context) { - + System.setProperty(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString()); } @Override public void doFailover(int componentId) { - } @Override public FailoverStrategyType getType() { - return FailoverStrategyType.disable_fo; + return FailoverStrategyType.component_fo; } } diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java new file mode 100644 index 000000000..aa4384ec2 --- /dev/null +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFailoverStrategy.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.cluster.ray.failover; + +import com.antgroup.geaflow.cluster.clustermanager.ClusterContext; +import com.antgroup.geaflow.cluster.failover.FailoverStrategyType; +import com.antgroup.geaflow.cluster.ray.utils.RayConfig; +import io.ray.api.options.ActorCreationOptions; + +public class RayDisableFailoverStrategy extends AbstractRayFailoverStrategy { + + @Override + public void init(ClusterContext context) { + System.setProperty(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString()); + context.getClusterConfig().setFoEnable(Boolean.FALSE); + // Reset restart times to disable fo. + context.getClusterConfig().setMaxRestarts(ActorCreationOptions.NO_RESTART); + } + + @Override + public void doFailover(int componentId) { + } + + @Override + public FailoverStrategyType getType() { + return FailoverStrategyType.disable_fo; + } +} diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RayConfig.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RayConfig.java similarity index 88% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RayConfig.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RayConfig.java index 6e57fbef0..daf898b83 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RayConfig.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RayConfig.java @@ -21,16 +21,13 @@ public class RayConfig { public static final String RAY_JOB_JVM_OPTIONS_PREFIX = "ray.job.jvm-options"; public static final String RAY_TASK_RETURN_TASK_EXCEPTION = "ray.task.return_task_exception"; public static final String RAY_JOB_L1FO_ENABLE = "ray.job.enable-l1-fault-tolerance"; - public static final String RAY_JOB_LONG_RUNNING = "ray.job.long-running"; + public static final String RAY_JOB_RUNTIME_ENV = "ray.job.runtime-env"; public static final String RAY_JOB_WORKING_DIR = "working_dir"; public static final int CLUSTER_RESERVED_MEMORY_MB = 3 * 1024; public static final int WORKER_RESERVED_MEMORY_MB = 3 * 1024; - // Sets how many actor threads can be started by a jvm process - public static final String RAY_JOB_NUM_JAVA_WORKER_PER_PROCESS = - "ray.job.num-java-workers-per-process"; // Sets the amount of memory occupied by a jvm process, both in and out of the heap public static final String RAY_JOB_JAVA_WORKER_PROCESS_DEFAULT_MEMORY_MB = @@ -57,5 +54,6 @@ public class RayConfig { public static final String CUSTOM_LOGGER_FILE_NAME = "geaflow-user-%p.log"; - public static final String CUSTOM_LOGGER_PATTERN = "%d{yyyy-MM-dd HH:mm:ss,SSS} %p %c{1} [%t]: %m%n)"; + public static final String CUSTOM_LOGGER_PATTERN = "%d{yyyy-MM-dd HH:mm:ss,SSS} %p %c{1} [%t]: %m%n"; } + diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java similarity index 92% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java index 25ff1fd9f..c7b222f6a 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/java/com/antgroup/geaflow/cluster/ray/utils/RaySystemFunc.java @@ -36,7 +36,7 @@ public class RaySystemFunc implements Serializable { private static final Logger LOGGER = LoggerFactory.getLogger(RaySystemFunc.class); - private static final Object lock = new Object(); + private static final Object LOCK = new Object(); private static String appPath; public static boolean isRestarted() { @@ -51,7 +51,7 @@ public static String getWorkPath() { if (appPath != null) { return appPath; } - synchronized (lock) { + synchronized (LOCK) { if (Ray.getRuntimeContext().isLocalMode()) { appPath = "/tmp/" + System.currentTimeMillis(); try { @@ -79,15 +79,13 @@ public static void initRayEnv(ClusterConfig clusterConfig) { systemProperties.put(RayConfig.RAY_RUN_MODE, RunMode.CLUSTER.name()); } - // Sets how many actor threads can be started by a jvm process - systemProperties.put(RayConfig.RAY_JOB_NUM_JAVA_WORKER_PER_PROCESS, String.valueOf(1)); - - // The amount of memory used by a jvm process, both in and out of the heap, must be a factor of 50 + // Sets how many actor threads can be started by a jvm process. int containerMemoryMb = clusterConfig.getContainerMemoryMB(); + // The amount of memory used by a jvm process, both in and out of the heap, must be a factor of 50. systemProperties.put(RayConfig.RAY_JOB_JAVA_WORKER_PROCESS_DEFAULT_MEMORY_MB, String.valueOf(MathUtil.multiplesOf50(containerMemoryMb))); - // To set how many jvm processes are started there, geaflow defaults to 0 + // To set how many jvm processes are started there, geaflow defaults to 0. systemProperties.put(RayConfig.RAY_JOB_NUM_INITIAL_JAVA_WORKER_PROCESS, "0"); // Set all resources required for this job @@ -109,13 +107,8 @@ public static void initRayEnv(ClusterConfig clusterConfig) { // Otherwise, do not set this ratio, but set each jvm parameter separately in setting jvm parameters later systemProperties.put(RayConfig.RAY_JOB_JAVA_HEAP_FRACTION, "0.8"); - // default trigger L1 FO - boolean enableL1Fo = clusterConfig.isFoEnable(); - systemProperties.put(RayConfig.RAY_JOB_L1FO_ENABLE, String.valueOf(enableL1Fo)); systemProperties.put(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString()); - // allow job long running - systemProperties.put(RayConfig.RAY_JOB_LONG_RUNNING, Boolean.TRUE.toString()); // Set the JVM parameters below int optionIndex = 0; diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy similarity index 66% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy index 6ea06d48f..89df792dc 100644 --- a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.cluster.failover.IFailoverStrategy @@ -1,2 +1,3 @@ com.antgroup.geaflow.cluster.ray.failover.RayClusterFailoverStrategy +com.antgroup.geaflow.cluster.ray.failover.RayComponentFailoverStrategy com.antgroup.geaflow.cluster.ray.failover.RayDisableFailoverStrategy diff --git a/geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment similarity index 100% rename from geaflow/geaflow-deploy/geaflow-on-ray/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment rename to geaflow/geaflow-deploy/geaflow-on-ray-community/src/main/resources/META-INF/services/com.antgroup.geaflow.env.IEnvironment diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java new file mode 100644 index 000000000..77cf306e2 --- /dev/null +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayClusterFoStrategyTest.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.cluster.ray.failover; + +import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.FO_STRATEGY; + +import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory; +import com.antgroup.geaflow.cluster.failover.IFailoverStrategy; +import com.antgroup.geaflow.common.config.Configuration; +import com.antgroup.geaflow.env.IEnvironment.EnvType; +import org.junit.Assert; +import org.testng.annotations.Test; + +public class RayClusterFoStrategyTest { + + @Test + public void testLoad() { + Configuration configuration = new Configuration(); + IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, configuration.getString(FO_STRATEGY)); + Assert.assertNotNull(foStrategy); + Assert.assertEquals(foStrategy.getType().name(), configuration.getString(FO_STRATEGY)); + + IFailoverStrategy rayFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, "cluster_fo"); + Assert.assertNotNull(rayFoStrategy); + Assert.assertEquals(rayFoStrategy.getClass(), RayClusterFailoverStrategy.class); + + rayFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, "component_fo"); + Assert.assertNotNull(rayFoStrategy); + Assert.assertEquals(rayFoStrategy.getClass(), RayComponentFailoverStrategy.class); + } + +} diff --git a/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java new file mode 100644 index 000000000..3b33bf5f1 --- /dev/null +++ b/geaflow/geaflow-deploy/geaflow-on-ray-community/src/test/java/com/antgroup/geaflow/cluster/ray/failover/RayDisableFoStrategyTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2023 AntGroup CO., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +package com.antgroup.geaflow.cluster.ray.failover; + +import static com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys.FO_STRATEGY; + +import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory; +import com.antgroup.geaflow.cluster.failover.IFailoverStrategy; +import com.antgroup.geaflow.common.config.Configuration; +import com.antgroup.geaflow.env.IEnvironment.EnvType; +import org.junit.Assert; +import org.testng.annotations.Test; + +public class RayDisableFoStrategyTest { + + @Test + public void testLoad() { + Configuration configuration = new Configuration(); + IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, configuration.getString(FO_STRATEGY)); + Assert.assertNotNull(foStrategy); + Assert.assertEquals(foStrategy.getType().name(), configuration.getString(FO_STRATEGY)); + + IFailoverStrategy rayDisableFoStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, "disable_fo"); + Assert.assertNotNull(rayDisableFoStrategy); + Assert.assertEquals(rayDisableFoStrategy.getClass(), RayDisableFailoverStrategy.class); + } + +} diff --git a/geaflow/geaflow-deploy/pom.xml b/geaflow/geaflow-deploy/pom.xml index 013ba2586..223af12ab 100644 --- a/geaflow/geaflow-deploy/pom.xml +++ b/geaflow/geaflow-deploy/pom.xml @@ -28,7 +28,7 @@ pom - geaflow-on-ray + geaflow-on-ray-community geaflow-on-k8s geaflow-on-local geaflow-assembly diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowGqlClient.java b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowGqlClient.java index 807e89b31..fa5348a6a 100644 --- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowGqlClient.java +++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/main/java/com/antgroup/geaflow/dsl/runtime/engine/GeaFlowGqlClient.java @@ -86,8 +86,8 @@ public static Environment loadEnvironment(String[] args) { switch (clusterType) { case K8S: return EnvironmentFactory.onK8SEnvironment(args); - case RAY: - return EnvironmentFactory.onAntEnvironment(args); + case RAY_COMMUNITY: + return EnvironmentFactory.onRayCommunityEnvironment(args); default: return EnvironmentFactory.onLocalEnvironment(args); } diff --git a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamFoTest.java b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamFoTest.java index 12e8cce9b..cc3fcde69 100644 --- a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamFoTest.java +++ b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamFoTest.java @@ -32,7 +32,7 @@ public class UnBoundedStreamFoTest { private static final Logger LOGGER = LoggerFactory.getLogger(UnBoundedStreamFoTest.class); public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { - Environment environment = EnvironmentFactory.onAntEnvironment(args); + Environment environment = EnvironmentFactory.onRayCommunityEnvironment(args); Configuration configuration = ((EnvironmentContext) environment.getEnvironmentContext()).getConfig(); StreamWordCountPipeline pipeline = new StreamWordCountPipeline(); diff --git a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamWordPrint.java b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamWordPrint.java index f42cec89f..d574af778 100644 --- a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamWordPrint.java +++ b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/UnBoundedStreamWordPrint.java @@ -24,7 +24,7 @@ public class UnBoundedStreamWordPrint { private static final Logger LOGGER = LoggerFactory.getLogger(UnBoundedStreamWordPrint.class); public static void main(String[] args) { - Environment environment = EnvironmentFactory.onAntEnvironment(); + Environment environment = EnvironmentFactory.onRayCommunityEnvironment(); StreamWordPrintPipeline pipeline = new StreamWordPrintPipeline(); pipeline.submit(environment); diff --git a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/WindowStreamWordCount.java b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/WindowStreamWordCount.java index ec01c0882..fa7ca3aec 100644 --- a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/WindowStreamWordCount.java +++ b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/stream/WindowStreamWordCount.java @@ -38,7 +38,7 @@ public class WindowStreamWordCount { LoggerFactory.getLogger(WindowStreamWordCount.class); public static void main(String[] args) { - Environment environment = EnvironmentFactory.onAntEnvironment(args); + Environment environment = EnvironmentFactory.onRayCommunityEnvironment(args); Pipeline pipeline = PipelineFactory.buildPipeline(environment); pipeline.submit(new PipelineTask() { @Override diff --git a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/util/EnvironmentUtil.java b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/util/EnvironmentUtil.java index 52734f4e8..ac531cdfa 100644 --- a/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/util/EnvironmentUtil.java +++ b/geaflow/geaflow-examples/src/main/java/com/antgroup/geaflow/example/util/EnvironmentUtil.java @@ -32,8 +32,8 @@ public static Environment loadEnvironment(String[] args) { switch (clusterType) { case K8S: return EnvironmentFactory.onK8SEnvironment(args); - case RAY: - return EnvironmentFactory.onAntEnvironment(args); + case RAY_COMMUNITY: + return EnvironmentFactory.onRayCommunityEnvironment(args); default: return EnvironmentFactory.onLocalEnvironment(args); } diff --git a/geaflow/pom.xml b/geaflow/pom.xml index 83af8aa11..ae528c79d 100644 --- a/geaflow/pom.xml +++ b/geaflow/pom.xml @@ -443,7 +443,7 @@ com.antgroup.tugraph - geaflow-on-ray + geaflow-on-ray-community ${project.version}