Skip to content

Commit

Permalink
[ISSUE-377] Fix demo job and auto create demo job when console starts (
Browse files Browse the repository at this point in the history
  • Loading branch information
652053395 authored Nov 19, 2024
1 parent a52d823 commit 2f3a6d0
Show file tree
Hide file tree
Showing 13 changed files with 287 additions and 17 deletions.
5 changes: 4 additions & 1 deletion bin/gql_submit.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ CLASSPATH=$CLASSPATH:/tmp/geaflow/gql/
echo "CLASSPATH:$CLASSPATH"
echo -e "\033[32mView dashboard via http://localhost:8090.
See logs via url http://localhost:8090/#/components/master/logs or at local path ${GEAFLOW_LOG_PATH}\033[32m"

$JAVACMD -cp "$CLASSPATH" \
-DclusterType=LOCAL \
-Dlog.file=${GEAFLOW_LOG_PATH} \
-Dlog4j.configuration=${LOG4j_PROPERTIES_FILE_NAME} \
com.antgroup.geaflow.dsl.runtime.engine.GeaFlowGqlClient "${JOB_ARGS}" > ${GEAFLOW_LOG_PATH} 2>&1
com.antgroup.geaflow.dsl.runtime.engine.GeaFlowGqlClient "${JOB_ARGS}" > ${GEAFLOW_LOG_PATH} 2>&1

echo "Finished"
6 changes: 5 additions & 1 deletion geaflow-console/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,12 @@ COPY geaflow-console/target/config $GEAFLOW_HOME/config
COPY geaflow-console/docker $GEAFLOW_HOME/

# copy geaflow jar

ENV GEAFLOW_LOCAL_VERSION_PATH=/tmp/geaflow/local/versions/defaultVersion/defaultVersion.jar
COPY geaflow/geaflow-deploy/geaflow-assembly/target/geaflow-assembly-*[!-sources].jar \
$GEAFLOW_HOME/files/geaflow.jar
$GEAFLOW_LOCAL_VERSION_PATH
RUN md5sum $GEAFLOW_LOCAL_VERSION_PATH |awk '{print $1}' > $GEAFLOW_LOCAL_VERSION_PATH.md5


EXPOSE 8888 3306 6379 8086 2181

Expand Down
6 changes: 4 additions & 2 deletions geaflow-console/Dockerfile-arm64
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ COPY geaflow-console/target/config $GEAFLOW_HOME/config
COPY geaflow-console/docker $GEAFLOW_HOME/

# copy geaflow jar
COPY geaflow/geaflow-deploy/geaflow-assembly/target/geaflow-assembly-*.jar \
$GEAFLOW_HOME/files/geaflow.jar
ENV GEAFLOW_LOCAL_VERSION_PATH=/tmp/geaflow/local/versions/defaultVersion/defaultVersion.jar
COPY geaflow/geaflow-deploy/geaflow-assembly/target/geaflow-assembly-*[!-sources].jar \
$GEAFLOW_LOCAL_VERSION_PATH
RUN md5sum $GEAFLOW_LOCAL_VERSION_PATH |awk '{print $1}' > $GEAFLOW_LOCAL_VERSION_PATH.md5

EXPOSE 8888 3306 6379 8086 2181

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.antgroup.geaflow.console.biz.shared.demo;

import com.antgroup.geaflow.console.core.model.job.GeaflowJob;

public abstract class DemoJob {

public abstract GeaflowJob build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.antgroup.geaflow.console.biz.shared.demo;

import com.antgroup.geaflow.console.common.util.VelocityUtil;
import com.antgroup.geaflow.console.core.model.job.GeaflowJob;
import com.antgroup.geaflow.console.core.model.job.GeaflowProcessJob;
import java.util.HashMap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class SocketDemo extends DemoJob {

private static final String DEMO_JOB_TEMPLATE = "template/demoJob.vm";

public GeaflowJob build() {
GeaflowProcessJob job = new GeaflowProcessJob();
String code = VelocityUtil.applyResource(DEMO_JOB_TEMPLATE, new HashMap<>());
job.setUserCode(code);
job.setName("demoJob");
return job;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@
import com.antgroup.geaflow.console.biz.shared.InstallManager;
import com.antgroup.geaflow.console.biz.shared.VersionManager;
import com.antgroup.geaflow.console.biz.shared.convert.InstallViewConverter;
import com.antgroup.geaflow.console.biz.shared.demo.DemoJob;
import com.antgroup.geaflow.console.biz.shared.view.InstallView;
import com.antgroup.geaflow.console.common.util.Fmt;
import com.antgroup.geaflow.console.common.util.I18nUtil;
import com.antgroup.geaflow.console.common.util.NetworkUtil;
import com.antgroup.geaflow.console.common.util.ProcessUtil;
import com.antgroup.geaflow.console.common.util.context.ContextHolder;
import com.antgroup.geaflow.console.common.util.context.GeaflowContext;
import com.antgroup.geaflow.console.common.util.exception.GeaflowException;
import com.antgroup.geaflow.console.common.util.type.GeaflowPluginCategory;
import com.antgroup.geaflow.console.common.util.type.GeaflowPluginType;
import com.antgroup.geaflow.console.core.model.cluster.GeaflowCluster;
import com.antgroup.geaflow.console.core.model.config.GeaflowSystemConfig;
import com.antgroup.geaflow.console.core.model.config.SystemConfigKeys;
import com.antgroup.geaflow.console.core.model.data.GeaflowInstance;
import com.antgroup.geaflow.console.core.model.install.GeaflowInstall;
import com.antgroup.geaflow.console.core.model.job.GeaflowJob;
import com.antgroup.geaflow.console.core.model.plugin.config.ContainerPluginConfigClass;
import com.antgroup.geaflow.console.core.model.plugin.config.GeaflowPluginConfig;
import com.antgroup.geaflow.console.core.model.plugin.config.InfluxdbPluginConfigClass;
Expand All @@ -37,10 +42,16 @@
import com.antgroup.geaflow.console.core.model.plugin.config.LocalPluginConfigClass;
import com.antgroup.geaflow.console.core.model.plugin.config.PluginConfigClass;
import com.antgroup.geaflow.console.core.model.plugin.config.RedisPluginConfigClass;
import com.antgroup.geaflow.console.core.model.security.GeaflowTenant;
import com.antgroup.geaflow.console.core.model.security.GeaflowUser;
import com.antgroup.geaflow.console.core.service.ClusterService;
import com.antgroup.geaflow.console.core.service.DatasourceService;
import com.antgroup.geaflow.console.core.service.InstanceService;
import com.antgroup.geaflow.console.core.service.JobService;
import com.antgroup.geaflow.console.core.service.PluginConfigService;
import com.antgroup.geaflow.console.core.service.SystemConfigService;
import com.antgroup.geaflow.console.core.service.TenantService;
import com.antgroup.geaflow.console.core.service.UserService;
import com.antgroup.geaflow.console.core.service.config.DatasourceConfig;
import com.antgroup.geaflow.console.core.service.config.DeployConfig;
import com.antgroup.geaflow.console.core.service.security.TokenGenerator;
Expand Down Expand Up @@ -83,6 +94,26 @@ public class InstallManagerImpl implements InstallManager {
@Autowired
private TokenGenerator tokenGenerator;

@Autowired
private JobService jobService;

@Autowired
private InstanceService instanceService;

@Autowired
private UserService userService;

@Autowired
private TenantService tenantService;

private final List<DemoJob> demoJobs;

@Autowired
public InstallManagerImpl(List<DemoJob> demoJobs) {
this.demoJobs = demoJobs;
}


private interface ConfigBuilder {

PluginConfigClass configRuntimeCluster();
Expand Down Expand Up @@ -242,9 +273,41 @@ public boolean install(InstallView installView) {

// set install status
systemConfigService.setValue(SystemConfigKeys.GEAFLOW_INITIALIZED, true);

createDemoJobs();
return true;
}

private void createDemoJobs() {
GeaflowContext context = ContextHolder.get();
try {
GeaflowUser user = userService.get(context.getUserId());
GeaflowTenant tenant = tenantService.getByName(tenantService.getDefaultTenantName(user.getName()));
GeaflowInstance instance = instanceService.getByName(
instanceService.getDefaultInstanceName(user.getName()));
context.setTenantId(tenant.getId());
context.setSystemSession(false);

List<GeaflowJob> jobs = new ArrayList<>();
for (DemoJob demoJob : demoJobs) {
GeaflowJob job = demoJob.build();
job.setInstanceId(instance.getId());
jobs.add(job);
}

jobService.create(jobs);
log.info("create demo jobs success");
} catch (Exception e) {
log.error("create demo job failed", e);
throw e;
} finally {
context.setTenantId(null);
context.setSystemSession(true);
}

}


private class DefaultConfigBuilder implements ConfigBuilder {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.antgroup.geaflow.console.common.dal.model.PageList;
import com.antgroup.geaflow.console.common.dal.model.VersionSearch;
import com.antgroup.geaflow.console.common.util.FileUtil;
import com.antgroup.geaflow.console.common.util.Fmt;
import com.antgroup.geaflow.console.common.util.I18nUtil;
import com.antgroup.geaflow.console.common.util.context.ContextHolder;
import com.antgroup.geaflow.console.common.util.exception.GeaflowException;
Expand All @@ -37,6 +36,7 @@
import com.antgroup.geaflow.console.core.service.NameService;
import com.antgroup.geaflow.console.core.service.RemoteFileService;
import com.antgroup.geaflow.console.core.service.VersionService;
import com.antgroup.geaflow.console.core.service.file.LocalFileFactory;
import com.antgroup.geaflow.console.core.service.file.RemoteFileStorage;
import com.google.common.base.Preconditions;
import java.io.File;
Expand All @@ -59,11 +59,11 @@
public class VersionManagerImpl extends NameManagerImpl<GeaflowVersion, VersionView, VersionSearch> implements
VersionManager {

private static final String ENGINE_JAR_PREFIX = "geaflow-";
private static final String ENGINE_JAR_PREFIX = "";

private static final String LANG_JAR_PREFIX = "lang-";

private static final String GEAFLOW_JAR_DEFAULT_PATH = "files/geaflow.jar";
private static final String GEAFLOW_DEFAULT_VERSION_NAME = "defaultVersion";

@Autowired
private VersionService versionService;
Expand Down Expand Up @@ -124,13 +124,14 @@ public String createDefaultVersion() {
// in case of remote file config changed
remoteFileStorage.reset();

String path = Fmt.as("{}/{}", System.getProperty("user.dir"), GEAFLOW_JAR_DEFAULT_PATH);
String path = LocalFileFactory.getVersionFilePath(GEAFLOW_DEFAULT_VERSION_NAME,
GEAFLOW_DEFAULT_VERSION_NAME + ".jar");
if (!FileUtil.exist(path)) {
throw new GeaflowIllegalException("No geaflow jar found in {}", path);
}

VersionView versionView = new VersionView();
versionView.setName("0.1");
versionView.setName(GEAFLOW_DEFAULT_VERSION_NAME);
versionView.setComment(I18nUtil.getMessage("i18n.key.default.version"));
versionView.setPublish(true);

Expand Down Expand Up @@ -189,12 +190,12 @@ public boolean deleteVersion(String versionName) {

GeaflowRemoteFile engineJarPackage = version.getEngineJarPackage();
if (engineJarPackage != null) {
remoteFileManager.deleteRefJar(engineJarPackage.getId(),null, GeaflowResourceType.ENGINE_VERSION);
remoteFileManager.deleteRefJar(engineJarPackage.getId(), null, GeaflowResourceType.ENGINE_VERSION);
}

GeaflowRemoteFile langJarPackage = version.getLangJarPackage();
if (langJarPackage != null) {
remoteFileManager.deleteRefJar(langJarPackage.getId(),null, GeaflowResourceType.ENGINE_VERSION);
remoteFileManager.deleteRefJar(langJarPackage.getId(), null, GeaflowResourceType.ENGINE_VERSION);
}

return drop(version.getId());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
set geaflow.dsl.window.size = 1;
set geaflow.dsl.ignore.exception = true;

CREATE GRAPH IF NOT EXISTS dy_modern (
Vertex person (
id bigint ID,
name varchar
),
Edge knows (
srcId bigint SOURCE ID,
targetId bigint DESTINATION ID,
weight double
)
) WITH (
storeType='rocksdb',
shardCount = 1
);

CREATE TABLE IF NOT EXISTS tbl_source (
text varchar
) WITH (
type='file',
`geaflow.dsl.file.path` = 'resource:///demo/demo_job_data.txt',
`geaflow.dsl.column.separator`='|'
);

CREATE TABLE IF NOT EXISTS tbl_result (
a_id bigint,
b_id bigint,
c_id bigint,
d_id bigint,
a1_id bigint
) WITH (
type='file',
`geaflow.dsl.file.path` = '/tmp/geaflow/demo_job_result'
);

USE GRAPH dy_modern;

INSERT INTO dy_modern.person(id, name)
SELECT
cast(trim(split_ex(t1, ',', 0)) as bigint),
split_ex(trim(t1), ',', 1)
FROM (
Select trim(substr(text, 2)) as t1
FROM tbl_source
WHERE substr(text, 1, 1) = '.'
);

INSERT INTO dy_modern.knows
SELECT
cast(split_ex(t1, ',', 0) as bigint),
cast(split_ex(t1, ',', 1) as bigint),
cast(split_ex(t1, ',', 2) as double)
FROM (
Select trim(substr(text, 2)) as t1
FROM tbl_source
WHERE substr(text, 1, 1) = '-'
);

INSERT INTO tbl_result
SELECT DISTINCT
a_id,
b_id,
c_id,
d_id,
a1_id
FROM (
MATCH (a:person) -[:knows]->(b:person) -[:knows]-> (c:person)
-[:knows]-> (d:person) -> (a:person)
RETURN a.id as a_id, b.id as b_id, c.id as c_id, d.id as d_id, a.id as a1_id
);
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ public List<ResourceCount> getResourceCount(String instanceId, List<String> name
return instanceDao.getResourceCount(instanceId, names);
}

public String getDefaultInstanceName(String userName) {
return "instance_" + userName;
}

@Transactional
public String createDefaultInstance(String tenantId, GeaflowUser user) {
String userName = user.getName();
String userComment = user.getComment();
String instanceName = "instance_" + userName;
String instanceName = getDefaultInstanceName(userName);
String userDisplayName = StringUtils.isBlank(userComment) ? userName : userComment;
String instanceComment = Fmt.as(I18nUtil.getMessage("i18n.key.default.instance.comment.format"), userDisplayName);
String instanceComment = Fmt.as(I18nUtil.getMessage("i18n.key.default.instance.comment.format"),
userDisplayName);

// Need to set tenantId, using dao directly
InstanceEntity entity = new InstanceEntity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ public PageList<GeaflowTenant> search(TenantSearch search) {
return tenantDao.search(userId, search).transform(this::parse);
}

public String getDefaultTenantName(String userName) {
return "tenant_" + userName;
}

public String createDefaultTenant(GeaflowUser user) {
String userName = user.getName();
String userComment = user.getComment();
String tenantName = "tenant_" + userName;
String tenantName = getDefaultTenantName(userName);
String userDisplayName = StringUtils.isBlank(userComment) ? userName : userComment;
String tenantComment = Fmt.as(I18nUtil.getMessage("i18n.key.default.tenant.comment.format"), userDisplayName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ public File getTaskReleaseFile(String runtimeTaskId, String jobId, GeaflowReleas
return downloadFileWithMd5(path, filePath, release.getMd5());
}

private String getVersionFilePath(String versionName, String fileName) {
public static String getVersionFilePath(String versionName, String fileName) {
return Fmt.as("{}/{}/{}", LOCAL_VERSION_FILE_DIRECTORY, versionName, fileName);
}

private String getTaskFilePath(String runtimeTaskId, String fileName) {
public static String getTaskFilePath(String runtimeTaskId, String fileName) {
return Fmt.as("{}/{}/{}", LOCAL_TASK_FILE_DIRECTORY, runtimeTaskId, fileName);
}

private String getUserFilePath(String userId, String fileName) {
public static String getUserFilePath(String userId, String fileName) {
return Fmt.as("{}/{}/{}", LOCAL_USER_FILE_DIRECTORY, userId, fileName);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
. 1,jim
. 2,kate
. 3,lily
. 4,lucy
. 5,brown
. 6,jack
. 7,jackson
- 1,2,0.2
- 2,3,0.3
- 3,4,0.2
- 4,1,0.1
- 4,5,0.1
- 6,7,0.1
Loading

0 comments on commit 2f3a6d0

Please sign in to comment.