Skip to content

Commit

Permalink
[ISSUE-47] support run on ray community version
Browse files Browse the repository at this point in the history
  • Loading branch information
cbqiao authored Oct 27, 2023
1 parent 68d227f commit 40b1f94
Show file tree
Hide file tree
Showing 36 changed files with 220 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ public static Environment onLocalEnvironment(String[] args) {
return environment;
}

public static Environment onAntEnvironment() {
return (Environment) loadEnvironment(EnvType.RAY);
public static Environment onRayCommunityEnvironment() {
return (Environment) loadEnvironment(EnvType.RAY_COMMUNITY);
}

public static Environment onAntEnvironment(String[] args) {
Environment environment = (Environment) loadEnvironment(EnvType.RAY);
public static Environment onRayCommunityEnvironment(String[] args) {
Environment environment = (Environment) loadEnvironment(EnvType.RAY_COMMUNITY);
IEnvironmentArgsParser argsParser = loadEnvironmentArgsParser();
environment.getEnvironmentContext().withConfig(argsParser.parse(args));
return environment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public interface IEnvironment extends Serializable {
enum EnvType {

/**
* Ray cluster.
* Community ray cluster.
*/
RAY,
RAY_COMMUNITY,

/**
* K8s cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ public void init(ClusterContext clusterContext) {
this.driverFuture = new CompletableFuture<>();
this.containerIds = new HashSet<>();
this.driverIds = new HashSet<>();
this.foStrategy = buildFoStrategy();
this.foStrategy = buildFailoverStrategy();
Preconditions.checkNotNull(masterId, "masterId is not set");
}

public ClusterContext getClusterContext() {
return clusterContext;
}

protected abstract IFailoverStrategy buildFoStrategy();
protected abstract IFailoverStrategy buildFailoverStrategy();

@Override
public void allocateWorkers(int workerNum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FoStrategyFactory {
public class FailoverStrategyFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(FoStrategyFactory.class);
private static final Logger LOGGER = LoggerFactory.getLogger(FailoverStrategyFactory.class);

public static IFailoverStrategy loadFoStrategy(EnvType envType, String foStrategyType) {
public static IFailoverStrategy loadFailoverStrategy(EnvType envType, String foStrategyType) {
ServiceLoader<IFailoverStrategy> contextLoader = ServiceLoader.load(IFailoverStrategy.class);
Iterator<IFailoverStrategy> contextIterable = contextLoader.iterator();
while (contextIterable.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class FoStrategyFactoryTest {

@Test(expectedExceptions = GeaflowRuntimeException.class)
public void testLoad() {
FoStrategyFactory.loadFoStrategy(EnvType.RAY, "");
FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, "");
}

}
9 changes: 5 additions & 4 deletions geaflow/geaflow-deploy/geaflow-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<artifactId>geaflow-assembly</artifactId>

<dependencies>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down Expand Up @@ -56,10 +57,6 @@
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-on-k8s</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-on-ray</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-examples</artifactId>
Expand All @@ -72,6 +69,10 @@
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-dsl-runtime</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-on-ray-community</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.antgroup.geaflow.cluster.config.ClusterConfig;
import com.antgroup.geaflow.cluster.container.ContainerInfo;
import com.antgroup.geaflow.cluster.driver.DriverInfo;
import com.antgroup.geaflow.cluster.failover.FoStrategyFactory;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory;
import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.cluster.k8s.config.AbstractKubernetesParam;
import com.antgroup.geaflow.cluster.k8s.config.KubernetesConfig;
Expand Down Expand Up @@ -118,8 +118,8 @@ public void init(ClusterContext context, GeaflowKubeClient kubernetesClient) {
}

@Override
protected IFailoverStrategy buildFoStrategy() {
IFailoverStrategy foStrategy = FoStrategyFactory.loadFoStrategy(EnvType.K8S,
protected IFailoverStrategy buildFailoverStrategy() {
IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.K8S,
super.clusterConfig.getConfig().getString(FO_STRATEGY));
foStrategy.init(clusterContext);
((AbstractKubernetesFailoverStrategy) foStrategy).setClusterManager(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.container.ContainerContext;
import com.antgroup.geaflow.cluster.driver.DriverContext;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyType;
import com.antgroup.geaflow.cluster.failover.FoStrategyFactory;
import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.cluster.local.context.LocalContainerContext;
import com.antgroup.geaflow.cluster.local.context.LocalDriverContext;
Expand All @@ -43,8 +43,8 @@ public void init(ClusterContext clusterContext) {
}

@Override
protected IFailoverStrategy buildFoStrategy() {
return FoStrategyFactory.loadFoStrategy(IEnvironment.EnvType.LOCAL, FailoverStrategyType.disable_fo.name());
protected IFailoverStrategy buildFailoverStrategy() {
return FailoverStrategyFactory.loadFailoverStrategy(IEnvironment.EnvType.LOCAL, FailoverStrategyType.disable_fo.name());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>geaflow-on-ray</artifactId>
<artifactId>geaflow-on-ray-community</artifactId>
<packaging>jar</packaging>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public class RayClusterClient extends AbstractClusterClient {
public void init(IEnvironmentContext environmentContext) {
super.init(environmentContext);
clusterContext = new ClusterContext(config);
RaySystemFunc.initRayEnv(clusterContext.getClusterConfig());
config.put(JOB_WORK_PATH, RaySystemFunc.getWorkPath());
rayClusterManager = new RayClusterManager();
rayClusterManager.init(clusterContext);
RaySystemFunc.initRayEnv(clusterContext.getClusterConfig());
config.put(JOB_WORK_PATH, RaySystemFunc.getWorkPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ protected IClusterClient getClusterClient() {

@Override
public EnvType getEnvType() {
return EnvType.RAY;
return EnvType.RAY_COMMUNITY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.options.ActorLifetime;
import java.io.Serializable;
import java.util.List;
import org.slf4j.Logger;
Expand All @@ -41,6 +42,7 @@ public static ActorHandle<RayMasterRunner> createMaster(ClusterConfig clusterCon
ActorHandle<RayMasterRunner> masterRayActor = Ray
.actor(RayMasterRunner::new, clusterConfig.getConfig())
.setMaxRestarts(clusterConfig.getMaxRestarts())
.setLifetime(ActorLifetime.DETACHED)
.setJvmOptions(jvmOptions).remote();
LOGGER.info("master actor:{}, memoryMB:{}, jvmOptions:{}, isRestartAllFo:{}, foRestartTimes:{}",
masterRayActor.getId().toString(), totalMemoryMb, jvmOptions,
Expand All @@ -63,6 +65,7 @@ public static ActorHandle<RayDriverRunner> createDriver(ClusterConfig clusterCon
ActorHandle<RayDriverRunner> driverRayActor = Ray
.actor(RayDriverRunner::new, context)
.setMaxRestarts(clusterConfig.getMaxRestarts())
.setLifetime(ActorLifetime.DETACHED)
.setJvmOptions(jvmOptions).remote();
LOGGER.info("driver actor:{}, memoryMB:{}, jvmOptions:{}, isRestartAllFo:{}, foRestartTimes:{}",
driverRayActor.getId().toString(), totalMemoryMb, jvmOptions,
Expand All @@ -75,6 +78,7 @@ public static ActorHandle<RayContainerRunner> createContainer(ClusterConfig clus
ActorHandle<RayContainerRunner> rayContainer = Ray
.actor(RayContainerRunner::new, containerContext)
.setMaxRestarts(clusterConfig.getMaxRestarts())
.setLifetime(ActorLifetime.DETACHED)
.remote();
LOGGER.info("worker actor {} isDisableFo {}", rayContainer.getId().toString(),
clusterConfig.isFoEnable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,85 @@
import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.container.ContainerContext;
import com.antgroup.geaflow.cluster.driver.DriverContext;
import com.antgroup.geaflow.cluster.failover.FoStrategyFactory;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory;
import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.cluster.ray.context.RayContainerContext;
import com.antgroup.geaflow.cluster.ray.context.RayDriverContext;
import com.antgroup.geaflow.cluster.ray.entrypoint.RayContainerRunner;
import com.antgroup.geaflow.cluster.ray.entrypoint.RayDriverRunner;
import com.antgroup.geaflow.cluster.ray.entrypoint.RayMasterRunner;
import com.antgroup.geaflow.cluster.ray.failover.AbstractRayFailoverStrategy;
import com.antgroup.geaflow.cluster.ray.utils.RaySystemFunc;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.env.IEnvironment.EnvType;
import com.google.common.base.Preconditions;
import io.ray.api.ActorHandle;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RayClusterManager extends AbstractClusterManager {

private static final Logger LOGGER = LoggerFactory.getLogger(RayClusterManager.class);
private String appPath;
private static final int MASTER_ACTOR_ID = 0;
private static Map<Integer, ActorHandle> actors = new HashMap<>();

@Override
public void init(ClusterContext clusterContext) {
super.init(clusterContext);
this.appPath = RaySystemFunc.getWorkPath();
}

@Override
protected IFailoverStrategy buildFoStrategy() {
return FoStrategyFactory.loadFoStrategy(EnvType.RAY, clusterConfig.getConfig().getString(FO_STRATEGY));
protected IFailoverStrategy buildFailoverStrategy() {
IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(
EnvType.RAY_COMMUNITY, clusterConfig.getConfig().getString(FO_STRATEGY));
foStrategy.init(clusterContext);
((AbstractRayFailoverStrategy) foStrategy).setClusterManager(this);
return foStrategy;
}

@Override
public RayClusterId startMaster() {
Preconditions.checkArgument(clusterConfig != null, "clusterConfig is not initialized");
clusterInfo = new RayClusterId(RayClient.createMaster(clusterConfig));
ActorHandle<RayMasterRunner> master = RayClient.createMaster(clusterConfig);
clusterInfo = new RayClusterId(master);
actors.put(MASTER_ACTOR_ID, master);
return (RayClusterId) clusterInfo;
}

@Override
public void restartContainer(int containerId) {
// do nothing.
if (!actors.containsKey(containerId)) {
throw new GeaflowRuntimeException(String.format("invalid container id %s", containerId));
}
actors.get(containerId).kill();
}

@Override
public void doStartContainer(int containerId, boolean isRecover) {
ContainerContext containerContext = new RayContainerContext(containerId,
clusterConfig.getConfig());
RayClient.createContainer(clusterConfig, containerContext);
ActorHandle<RayContainerRunner> container = RayClient.createContainer(clusterConfig, containerContext);
actors.put(containerId, container);
}

@Override
public void doStartDriver(int driverId) {
DriverContext driverContext = new RayDriverContext(driverId, clusterConfig.getConfig());
RayClient.createDriver(clusterConfig, driverContext);
ActorHandle<RayDriverRunner> driver = RayClient.createDriver(clusterConfig, driverContext);
actors.put(driverId, driver);
LOGGER.info("call driver start");
}

@Override
public void close() {
super.close();
actors.clear();
if (RaySystemFunc.isLocalMode()) {
FileUtils.deleteQuietly(new File(appPath));
FileUtils.deleteQuietly(new File(RaySystemFunc.getWorkPath()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public RayMasterRunner(Configuration configuration) {
MasterContext context = new MasterContext(configuration);
context.setRecover(RaySystemFunc.isRestarted());
context.setClusterManager(new RayClusterManager());
context.setEnvType(EnvType.RAY);
context.setEnvType(EnvType.RAY_COMMUNITY);
master.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@
package com.antgroup.geaflow.cluster.ray.failover;

import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.cluster.ray.clustermanager.RayClusterManager;
import com.antgroup.geaflow.env.IEnvironment.EnvType;

public abstract class AbstractRayFailoverStrategy implements IFailoverStrategy {

protected RayClusterManager clusterManager;

@Override
public EnvType getEnv() {
return EnvType.RAY;
return EnvType.RAY_COMMUNITY;
}

public RayClusterManager getClusterManager() {
return clusterManager;
}

public void setClusterManager(RayClusterManager clusterManager) {
this.clusterManager = clusterManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public class RayClusterFailoverStrategy extends AbstractRayFailoverStrategy {

@Override
public void init(ClusterContext context) {
// TODO set before ray init.
System.setProperty(RayConfig.RAY_JOB_L1FO_ENABLE, String.valueOf(Boolean.TRUE));
System.setProperty(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@

import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyType;
import com.antgroup.geaflow.cluster.ray.utils.RayConfig;

public class RayDisableFailoverStrategy extends AbstractRayFailoverStrategy {
public class RayComponentFailoverStrategy extends AbstractRayFailoverStrategy {

@Override
public void init(ClusterContext context) {

System.setProperty(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString());
}

@Override
public void doFailover(int componentId) {

}

@Override
public FailoverStrategyType getType() {
return FailoverStrategyType.disable_fo;
return FailoverStrategyType.component_fo;
}
}
Loading

0 comments on commit 40b1f94

Please sign in to comment.