Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-47] support run on ray community version #89

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ public static Environment onLocalEnvironment(String[] args) {
return environment;
}

public static Environment onAntEnvironment() {
return (Environment) loadEnvironment(EnvType.RAY);
public static Environment onRayCommunityEnvironment() {
return (Environment) loadEnvironment(EnvType.RAY_COMMUNITY);
}

public static Environment onAntEnvironment(String[] args) {
Environment environment = (Environment) loadEnvironment(EnvType.RAY);
public static Environment onRayCommunityEnvironment(String[] args) {
Environment environment = (Environment) loadEnvironment(EnvType.RAY_COMMUNITY);
IEnvironmentArgsParser argsParser = loadEnvironmentArgsParser();
environment.getEnvironmentContext().withConfig(argsParser.parse(args));
return environment;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ public interface IEnvironment extends Serializable {
enum EnvType {

/**
* Ray cluster.
* Community ray cluster.
*/
RAY,
RAY_COMMUNITY,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerning about this. Why we rename ray to ray community? For the open source software, the meaning of "ray" is "open source ray" by default. I think we don't need to rename it because there is only one ray project in community currently. "RAY_COMMUNITY" is confusing for open source users.


/**
* K8s cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ public void init(ClusterContext clusterContext) {
this.driverFuture = new CompletableFuture<>();
this.containerIds = new HashSet<>();
this.driverIds = new HashSet<>();
this.foStrategy = buildFoStrategy();
this.foStrategy = buildFailoverStrategy();
Preconditions.checkNotNull(masterId, "masterId is not set");
}

public ClusterContext getClusterContext() {
return clusterContext;
}

protected abstract IFailoverStrategy buildFoStrategy();
protected abstract IFailoverStrategy buildFailoverStrategy();

@Override
public void allocateWorkers(int workerNum) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FoStrategyFactory {
public class FailoverStrategyFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(FoStrategyFactory.class);
private static final Logger LOGGER = LoggerFactory.getLogger(FailoverStrategyFactory.class);

public static IFailoverStrategy loadFoStrategy(EnvType envType, String foStrategyType) {
public static IFailoverStrategy loadFailoverStrategy(EnvType envType, String foStrategyType) {
ServiceLoader<IFailoverStrategy> contextLoader = ServiceLoader.load(IFailoverStrategy.class);
Iterator<IFailoverStrategy> contextIterable = contextLoader.iterator();
while (contextIterable.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class FoStrategyFactoryTest {

@Test(expectedExceptions = GeaflowRuntimeException.class)
public void testLoad() {
FoStrategyFactory.loadFoStrategy(EnvType.RAY, "");
FailoverStrategyFactory.loadFailoverStrategy(EnvType.RAY_COMMUNITY, "");
}

}
9 changes: 5 additions & 4 deletions geaflow/geaflow-deploy/geaflow-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<artifactId>geaflow-assembly</artifactId>

<dependencies>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
Expand Down Expand Up @@ -56,10 +57,6 @@
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-on-k8s</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-on-ray</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-examples</artifactId>
Expand All @@ -72,6 +69,10 @@
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-dsl-runtime</artifactId>
</dependency>
<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-on-ray-community</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import com.antgroup.geaflow.cluster.config.ClusterConfig;
import com.antgroup.geaflow.cluster.container.ContainerInfo;
import com.antgroup.geaflow.cluster.driver.DriverInfo;
import com.antgroup.geaflow.cluster.failover.FoStrategyFactory;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory;
import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.cluster.k8s.config.AbstractKubernetesParam;
import com.antgroup.geaflow.cluster.k8s.config.KubernetesConfig;
Expand Down Expand Up @@ -118,8 +118,8 @@ public void init(ClusterContext context, GeaflowKubeClient kubernetesClient) {
}

@Override
protected IFailoverStrategy buildFoStrategy() {
IFailoverStrategy foStrategy = FoStrategyFactory.loadFoStrategy(EnvType.K8S,
protected IFailoverStrategy buildFailoverStrategy() {
IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(EnvType.K8S,
super.clusterConfig.getConfig().getString(FO_STRATEGY));
foStrategy.init(clusterContext);
((AbstractKubernetesFailoverStrategy) foStrategy).setClusterManager(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.container.ContainerContext;
import com.antgroup.geaflow.cluster.driver.DriverContext;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyType;
import com.antgroup.geaflow.cluster.failover.FoStrategyFactory;
import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.cluster.local.context.LocalContainerContext;
import com.antgroup.geaflow.cluster.local.context.LocalDriverContext;
Expand All @@ -43,8 +43,8 @@ public void init(ClusterContext clusterContext) {
}

@Override
protected IFailoverStrategy buildFoStrategy() {
return FoStrategyFactory.loadFoStrategy(IEnvironment.EnvType.LOCAL, FailoverStrategyType.disable_fo.name());
protected IFailoverStrategy buildFailoverStrategy() {
return FailoverStrategyFactory.loadFailoverStrategy(IEnvironment.EnvType.LOCAL, FailoverStrategyType.disable_fo.name());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>geaflow-on-ray</artifactId>
<artifactId>geaflow-on-ray-community</artifactId>
<packaging>jar</packaging>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public class RayClusterClient extends AbstractClusterClient {
public void init(IEnvironmentContext environmentContext) {
super.init(environmentContext);
clusterContext = new ClusterContext(config);
RaySystemFunc.initRayEnv(clusterContext.getClusterConfig());
config.put(JOB_WORK_PATH, RaySystemFunc.getWorkPath());
rayClusterManager = new RayClusterManager();
rayClusterManager.init(clusterContext);
RaySystemFunc.initRayEnv(clusterContext.getClusterConfig());
config.put(JOB_WORK_PATH, RaySystemFunc.getWorkPath());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ protected IClusterClient getClusterClient() {

@Override
public EnvType getEnvType() {
return EnvType.RAY;
return EnvType.RAY_COMMUNITY;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import io.ray.api.options.ActorLifetime;
import java.io.Serializable;
import java.util.List;
import org.slf4j.Logger;
Expand All @@ -41,6 +42,7 @@ public static ActorHandle<RayMasterRunner> createMaster(ClusterConfig clusterCon
ActorHandle<RayMasterRunner> masterRayActor = Ray
.actor(RayMasterRunner::new, clusterConfig.getConfig())
.setMaxRestarts(clusterConfig.getMaxRestarts())
.setLifetime(ActorLifetime.DETACHED)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"detached" actor will not be killed when the ray job is finished or failed. Does it make sense for your scenes? For more detail, you can read this doc https://docs.ray.io/en/latest/ray-core/actors/named-actors.html?highlight=detached#actor-lifetimes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it make sense for out scenes. we expected the actor run with long-running mode.

.setJvmOptions(jvmOptions).remote();
LOGGER.info("master actor:{}, memoryMB:{}, jvmOptions:{}, isRestartAllFo:{}, foRestartTimes:{}",
masterRayActor.getId().toString(), totalMemoryMb, jvmOptions,
Expand All @@ -63,6 +65,7 @@ public static ActorHandle<RayDriverRunner> createDriver(ClusterConfig clusterCon
ActorHandle<RayDriverRunner> driverRayActor = Ray
.actor(RayDriverRunner::new, context)
.setMaxRestarts(clusterConfig.getMaxRestarts())
.setLifetime(ActorLifetime.DETACHED)
.setJvmOptions(jvmOptions).remote();
LOGGER.info("driver actor:{}, memoryMB:{}, jvmOptions:{}, isRestartAllFo:{}, foRestartTimes:{}",
driverRayActor.getId().toString(), totalMemoryMb, jvmOptions,
Expand All @@ -75,6 +78,7 @@ public static ActorHandle<RayContainerRunner> createContainer(ClusterConfig clus
ActorHandle<RayContainerRunner> rayContainer = Ray
.actor(RayContainerRunner::new, containerContext)
.setMaxRestarts(clusterConfig.getMaxRestarts())
.setLifetime(ActorLifetime.DETACHED)
.remote();
LOGGER.info("worker actor {} isDisableFo {}", rayContainer.getId().toString(),
clusterConfig.isFoEnable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,85 @@
import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.container.ContainerContext;
import com.antgroup.geaflow.cluster.driver.DriverContext;
import com.antgroup.geaflow.cluster.failover.FoStrategyFactory;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyFactory;
import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.cluster.ray.context.RayContainerContext;
import com.antgroup.geaflow.cluster.ray.context.RayDriverContext;
import com.antgroup.geaflow.cluster.ray.entrypoint.RayContainerRunner;
import com.antgroup.geaflow.cluster.ray.entrypoint.RayDriverRunner;
import com.antgroup.geaflow.cluster.ray.entrypoint.RayMasterRunner;
import com.antgroup.geaflow.cluster.ray.failover.AbstractRayFailoverStrategy;
import com.antgroup.geaflow.cluster.ray.utils.RaySystemFunc;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.env.IEnvironment.EnvType;
import com.google.common.base.Preconditions;
import io.ray.api.ActorHandle;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RayClusterManager extends AbstractClusterManager {

private static final Logger LOGGER = LoggerFactory.getLogger(RayClusterManager.class);
private String appPath;
private static final int MASTER_ACTOR_ID = 0;
private static Map<Integer, ActorHandle> actors = new HashMap<>();

@Override
public void init(ClusterContext clusterContext) {
super.init(clusterContext);
this.appPath = RaySystemFunc.getWorkPath();
}

@Override
protected IFailoverStrategy buildFoStrategy() {
return FoStrategyFactory.loadFoStrategy(EnvType.RAY, clusterConfig.getConfig().getString(FO_STRATEGY));
protected IFailoverStrategy buildFailoverStrategy() {
IFailoverStrategy foStrategy = FailoverStrategyFactory.loadFailoverStrategy(
EnvType.RAY_COMMUNITY, clusterConfig.getConfig().getString(FO_STRATEGY));
foStrategy.init(clusterContext);
((AbstractRayFailoverStrategy) foStrategy).setClusterManager(this);
return foStrategy;
}

@Override
public RayClusterId startMaster() {
Preconditions.checkArgument(clusterConfig != null, "clusterConfig is not initialized");
clusterInfo = new RayClusterId(RayClient.createMaster(clusterConfig));
ActorHandle<RayMasterRunner> master = RayClient.createMaster(clusterConfig);
clusterInfo = new RayClusterId(master);
actors.put(MASTER_ACTOR_ID, master);
return (RayClusterId) clusterInfo;
}

@Override
public void restartContainer(int containerId) {
// do nothing.
if (!actors.containsKey(containerId)) {
throw new GeaflowRuntimeException(String.format("invalid container id %s", containerId));
}
actors.get(containerId).kill();
}

@Override
public void doStartContainer(int containerId, boolean isRecover) {
ContainerContext containerContext = new RayContainerContext(containerId,
clusterConfig.getConfig());
RayClient.createContainer(clusterConfig, containerContext);
ActorHandle<RayContainerRunner> container = RayClient.createContainer(clusterConfig, containerContext);
actors.put(containerId, container);
}

@Override
public void doStartDriver(int driverId) {
DriverContext driverContext = new RayDriverContext(driverId, clusterConfig.getConfig());
RayClient.createDriver(clusterConfig, driverContext);
ActorHandle<RayDriverRunner> driver = RayClient.createDriver(clusterConfig, driverContext);
actors.put(driverId, driver);
LOGGER.info("call driver start");
}

@Override
public void close() {
super.close();
actors.clear();
if (RaySystemFunc.isLocalMode()) {
FileUtils.deleteQuietly(new File(appPath));
FileUtils.deleteQuietly(new File(RaySystemFunc.getWorkPath()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public RayMasterRunner(Configuration configuration) {
MasterContext context = new MasterContext(configuration);
context.setRecover(RaySystemFunc.isRestarted());
context.setClusterManager(new RayClusterManager());
context.setEnvType(EnvType.RAY);
context.setEnvType(EnvType.RAY_COMMUNITY);
master.init(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@
package com.antgroup.geaflow.cluster.ray.failover;

import com.antgroup.geaflow.cluster.failover.IFailoverStrategy;
import com.antgroup.geaflow.cluster.ray.clustermanager.RayClusterManager;
import com.antgroup.geaflow.env.IEnvironment.EnvType;

public abstract class AbstractRayFailoverStrategy implements IFailoverStrategy {

protected RayClusterManager clusterManager;

@Override
public EnvType getEnv() {
return EnvType.RAY;
return EnvType.RAY_COMMUNITY;
}

public RayClusterManager getClusterManager() {
return clusterManager;
}

public void setClusterManager(RayClusterManager clusterManager) {
this.clusterManager = clusterManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ public class RayClusterFailoverStrategy extends AbstractRayFailoverStrategy {

@Override
public void init(ClusterContext context) {
// TODO set before ray init.
System.setProperty(RayConfig.RAY_JOB_L1FO_ENABLE, String.valueOf(Boolean.TRUE));
System.setProperty(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@

import com.antgroup.geaflow.cluster.clustermanager.ClusterContext;
import com.antgroup.geaflow.cluster.failover.FailoverStrategyType;
import com.antgroup.geaflow.cluster.ray.utils.RayConfig;

public class RayDisableFailoverStrategy extends AbstractRayFailoverStrategy {
public class RayComponentFailoverStrategy extends AbstractRayFailoverStrategy {

@Override
public void init(ClusterContext context) {

System.setProperty(RayConfig.RAY_TASK_RETURN_TASK_EXCEPTION, Boolean.FALSE.toString());
}

@Override
public void doFailover(int componentId) {

}

@Override
public FailoverStrategyType getType() {
return FailoverStrategyType.disable_fo;
return FailoverStrategyType.component_fo;
}
}
Loading
Loading