Skip to content

Commit

Permalink
[ISSUE-408] Support submit to ray in console (TuGraph-family#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
652053395 authored Dec 27, 2024
1 parent 0868323 commit c07b5c4
Show file tree
Hide file tree
Showing 35 changed files with 774 additions and 204 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public List<String> getPluginCategoryTypes(GeaflowPluginCategory category) {
types.add(GeaflowPluginType.CONTAINER.name());
}
types.add(GeaflowPluginType.K8S.name());
types.add(GeaflowPluginType.RAY.name());
break;
case RUNTIME_META:
types.add(GeaflowPluginType.JDBC.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,35 +246,40 @@ public boolean install(InstallView installView) {
datasourceService.executeResource(jdbcConfig, "runtimemeta.init.sql");
}

// init version
versionManager.createDefaultVersion();

// init cluster
clusterService.create(new GeaflowCluster(install.getRuntimeClusterConfig()));

// setup influxdb
if (deployConfig.isLocalMode()) {
GeaflowPluginConfig metricConfig = install.getMetricConfig();
if (GeaflowPluginType.INFLUXDB.name().equals(metricConfig.getType())) {
InfluxdbPluginConfigClass influxdbConfig = metricConfig.getConfig()
.parse(InfluxdbPluginConfigClass.class);
.parse(InfluxdbPluginConfigClass.class);
if (influxdbConfig.getUrl().contains(deployConfig.getHost())) {
String org = influxdbConfig.getOrg();
String bucket = influxdbConfig.getBucket();
String token = influxdbConfig.getToken();
String setupCommand = Fmt.as("/usr/local/bin/influx setup --org '{}' --bucket '{}' "
+ "--username geaflow --password geaflow123456 --token '{}' --force", org, bucket, token);

log.info("Setup influxdb with command {}", setupCommand);
ProcessUtil.execute(setupCommand);
try {
String org = influxdbConfig.getOrg();
String bucket = influxdbConfig.getBucket();
String token = influxdbConfig.getToken();
String setupCommand = Fmt.as("/usr/local/bin/influx setup --org '{}' --bucket '{}' "
+ "--username geaflow --password geaflow123456 --token '{}' --force", org, bucket, token);

log.info("Setup influxdb with command {}", setupCommand);
ProcessUtil.execute(setupCommand);
} catch (Exception e) {
log.error("Set up influx db failed", e);
}

}
}
}

// set install status
systemConfigService.setValue(SystemConfigKeys.GEAFLOW_INITIALIZED, true);
// init cluster
clusterService.create(new GeaflowCluster(install.getRuntimeClusterConfig()));

// init version
versionManager.createDefaultVersion();

createDemoJobs();

// set install status
systemConfigService.setValue(SystemConfigKeys.GEAFLOW_INITIALIZED, true);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,14 @@ private boolean updateApiJob(String jobId, JobView updateView, MultipartFile jar

// try to delete old file
GeaflowJob job = jobService.get(jobId);
String oldJarId = job.getJarPackage().getId();
try {
remoteFileManager.deleteRefJar(oldJarId, jobId, GeaflowResourceType.JOB);
} catch (Exception e) {
log.info("delete job jar fail, jobName: {}, jarId: {}", job.getName(), oldJarId);
GeaflowRemoteFile oldJar = job.getJarPackage();
if (oldJar != null) {
String oldJarId = job.getJarPackage().getId();
try {
remoteFileManager.deleteRefJar(oldJarId, jobId, GeaflowResourceType.JOB);
} catch (Exception e) {
log.info("delete job jar fail, jobName: {}, jarId: {}", job.getName(), oldJarId);
}
}

} else if (fileId != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
import com.antgroup.geaflow.console.core.model.runtime.GeaflowPipeline;
import com.antgroup.geaflow.console.core.model.task.GeaflowHeartbeatInfo;
import com.antgroup.geaflow.console.core.model.task.GeaflowTask;
import com.antgroup.geaflow.console.core.model.task.GeaflowTaskHandle.StartupNotifyInfo;
import com.antgroup.geaflow.console.core.model.task.K8sTaskHandle;
import com.antgroup.geaflow.console.core.model.task.K8sTaskHandle.StartupNotifyInfo;
import com.antgroup.geaflow.console.core.model.task.TaskFile;
import com.antgroup.geaflow.console.core.service.AuditService;
import com.antgroup.geaflow.console.core.service.IdService;
Expand Down Expand Up @@ -223,14 +224,18 @@ public void startupNotify(String taskId, TaskStartupNotifyView startupNotifyView
StartupNotifyInfo startupNotifyInfo;
GeaflowTaskStatus newStatus;
GeaflowTask task = taskService.get(taskId);
if (task.getHandle().getClusterType() != GeaflowPluginType.K8S) {
return;
}

if (startupNotifyView.isSuccess()) {
startupNotifyInfo = startupNotifyView.getData();
newStatus = RUNNING;
} else {
startupNotifyInfo = new StartupNotifyInfo();
newStatus = FAILED;
}
task.getHandle().setStartupNotifyInfo(startupNotifyInfo);
((K8sTaskHandle) task.getHandle()).setStartupNotifyInfo(startupNotifyInfo);
taskService.update(task);
taskService.updateStatus(task.getId(), task.getStatus(), newStatus);
log.info("Task {} get startup notify '{}' from cluster", task.getId(), JSON.toJSONString(startupNotifyView));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

package com.antgroup.geaflow.console.biz.shared.view;

import com.antgroup.geaflow.console.core.model.task.GeaflowTaskHandle.StartupNotifyInfo;

import com.antgroup.geaflow.console.core.model.task.K8sTaskHandle.StartupNotifyInfo;
import lombok.Getter;
import lombok.Setter;

Expand Down
6 changes: 6 additions & 0 deletions geaflow-console/app/common/util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.10.0</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,99 +14,96 @@

package com.antgroup.geaflow.console.common.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.JSONObject;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.net.URISyntaxException;
import java.util.Map;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import okhttp3.Headers;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Request.Builder;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.fluent.Request;
import org.apache.http.client.utils.URIBuilder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HTTPUtil {

public static <T> T getResultData(String result, Type type) {
return JSON.parseObject(result, type);
}
private static final Logger LOGGER = LoggerFactory.getLogger(HTTPUtil.class);
private static final MediaType MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8");

public static <T> T getResultData(String result, TypeReference<T> typeReference) {
return JSON.parseObject(result, typeReference);
public static JSONObject post(String url, String json) {
return post(url, json, JSONObject.class);
}

public static Request get(URIBuilder uri) throws URISyntaxException {
return Request.Get(uri.build()).addHeader("Accept", "application/json");
public static JSONObject post(String url, String json, Map<String, String> headers) {
return post(url, json, headers, JSONObject.class);
}

public static Request get(URIBuilder uri, Integer timeout) throws URISyntaxException {
Request request = get(uri);
if (timeout != null) {
request.connectTimeout(timeout).socketTimeout(timeout);
}
return request;
public static <T> T post(String url, String json, Class<T> resultClass) {
return post(url, json, null, resultClass);
}

public static Request get(String url) throws URISyntaxException {
return Request.Get(url).addHeader("Accept", "application/json");
}

public static Request get(String url, Integer timeout) throws URISyntaxException {
Request request = get(url);
if (timeout != null) {
request.connectTimeout(timeout).socketTimeout(timeout);
public static <T> T post(String url, String body, Map<String, String> headers, Class<T> resultClass) {
LOGGER.info("post url: {} body: {}", url, body);
RequestBody requestBody = RequestBody.create(body, MEDIA_TYPE);
Builder builder = getRequestBuilder(url, headers);
Request request = builder.post(requestBody).build();

OkHttpClient client = new OkHttpClient();
try (Response response = client.newCall(request).execute()) {
ResponseBody responseBody = response.body();
String msg = (responseBody != null) ? responseBody.string() : "{}";
if (!response.isSuccessful()) {
throw new RuntimeException(msg);
}

return JSONObject.toJavaObject(JSONObject.parseObject(msg), resultClass);
} catch (IOException e) {
LOGGER.info("execute post failed: {}", e.getCause(), e);
throw new RuntimeException(e);
}
return request;
}

public static Request post(String url) {
return Request.Post(url).addHeader("Accept", "application/json");
}

public static Request post(String url, Integer timeout) {
Request request = post(url);
if (timeout != null) {
request.connectTimeout(timeout).socketTimeout(timeout);
}
return request;
public static JSONObject get(String url) {
return get(url, null, JSONObject.class);
}

public static Request put(String url) {
return Request.Put(url).addHeader("Accept", "application/json");
}

public static Request put(String url, Integer timeout) {
Request request = put(url);
if (timeout != null) {
request.connectTimeout(timeout).socketTimeout(timeout);
}
return request;
}
public static <T> T get(String url, Map<String, String> headers, Class<T> resultClass) {
LOGGER.info("get url: {}", url);
Builder builder = getRequestBuilder(url, headers);
Request request = builder.get().build();

public static Request put(URIBuilder uri) throws URISyntaxException {
return Request.Put(uri.build()).addHeader("Accept", "application/json");
}

public static Request put(URIBuilder uri, Integer timeout) throws URISyntaxException {
Request request = put(uri);
if (timeout != null) {
request.connectTimeout(timeout).socketTimeout(timeout);
}
return request;
}
OkHttpClient client = new OkHttpClient();
try (Response response = client.newCall(request).execute()) {
ResponseBody responseBody = response.body();
String msg = (responseBody != null) ? responseBody.string() : "{}";
if (!response.isSuccessful()) {
throw new RuntimeException(msg);
}

public static Request delete(URIBuilder uri) throws URISyntaxException {
return Request.Delete(uri.build()).addHeader("Accept", "application/json");
return JSONObject.toJavaObject(JSONObject.parseObject(msg), resultClass);
} catch (IOException e) {
LOGGER.info("execute get failed: {}", e.getCause(), e);
throw new RuntimeException(e);
}
}

public static Request delete(URIBuilder uri, Integer timeout) throws URISyntaxException {
Request request = delete(uri);
if (timeout != null) {
request.connectTimeout(timeout).socketTimeout(timeout);
private static Builder getRequestBuilder(String url, Map<String, String> headers) {
Builder requestBuilder = new Request.Builder().url(url);
if (headers != null) {
Headers requestHeaders = Headers.of(headers);
requestBuilder.headers(requestHeaders);
}
return request;
return requestBuilder;
}

public static void download(HttpServletResponse response, InputStream inputStream, String fileName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.console.common.util;

import com.antgroup.geaflow.console.common.util.exception.GeaflowException;
import java.util.concurrent.Callable;

public class RetryUtil {

public static <T> T exec(Callable<T> function, final int retryCount, long retryIntervalMs) {
int count = retryCount;
while (count > 0) {
try {
return function.call();

} catch (Exception e) {
if (--count == 0) {
throw new GeaflowException("exec failed withRetry", e);
}

ThreadUtil.sleepMilliSeconds(retryIntervalMs);
}
}

return null;
}
}
Loading

0 comments on commit c07b5c4

Please sign in to comment.