Skip to content

Commit

Permalink
Merge branch 'TuGraph-family:master' into add-partners
Browse files Browse the repository at this point in the history
  • Loading branch information
fanzhidongyzby authored Jan 23, 2024
2 parents 646b0f1 + da00170 commit 92f4eb4
Show file tree
Hide file tree
Showing 47 changed files with 1,712 additions and 252 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.antgroup.geaflow.console.biz.shared.convert;

import com.alibaba.fastjson.JSON;
import com.antgroup.geaflow.console.biz.shared.view.FunctionView;
import com.antgroup.geaflow.console.biz.shared.view.GraphView;
import com.antgroup.geaflow.console.biz.shared.view.JobView;
Expand All @@ -32,12 +33,15 @@
import com.antgroup.geaflow.console.core.model.job.GeaflowJob;
import com.antgroup.geaflow.console.core.model.job.GeaflowProcessJob;
import com.antgroup.geaflow.console.core.model.job.GeaflowServeJob;
import com.antgroup.geaflow.console.core.model.job.GeaflowTransferJob.FieldMappingItem;
import com.antgroup.geaflow.console.core.model.job.GeaflowTransferJob.StructMapping;
import com.antgroup.geaflow.console.core.service.InstanceService;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
Expand Down Expand Up @@ -79,6 +83,10 @@ public void init() {
public void merge(JobView view, JobView updateView) {
super.merge(view, updateView);
switch (view.getType()) {
case INTEGRATE:
Optional.ofNullable(updateView.getStructMappings()).ifPresent(view::setStructMappings);
Optional.ofNullable(updateView.getGraphs()).ifPresent(view::setGraphs);
break;
case PROCESS:
Optional.ofNullable(updateView.getUserCode()).ifPresent(view::setUserCode);
break;
Expand All @@ -104,7 +112,8 @@ protected JobView modelToView(GeaflowJob model) {

jobView.setStructs(structs);
jobView.setGraphs(graphs);
jobView.setStructMappings(model.getStructMappings());

jobView.setStructMappings(JSON.toJSONString(model.getStructMappings()));

List<FunctionView> functions = ListUtil.convert(model.getFunctions(), e -> functionViewConverter.convert(e));
jobView.setFunctions(functions);
Expand All @@ -123,17 +132,23 @@ public GeaflowJob convert(JobView view, List<GeaflowStruct> structs, List<Geaflo
switch (jobType) {
case INTEGRATE:
GeaflowIntegrateJob integrateJob = (GeaflowIntegrateJob) viewToModel(view, GeaflowIntegrateJob.class);
Map<String, Map<String, Map<String, String>>> structMappings = view.getStructMappings();
Preconditions.checkNotNull(structMappings);

Preconditions.checkNotNull(view.getStructMappings());

List<StructMapping> structMappings = JSON.parseArray(view.getStructMappings(), StructMapping.class);
// dedup duplicated field mappings
for (StructMapping structMapping : structMappings) {
List<FieldMappingItem> distinctMapping = structMapping.getFieldMappings().stream().distinct().collect(Collectors.toList());
structMapping.setFieldMappings(distinctMapping);
}
integrateJob.setStructMappings(structMappings);
integrateJob.setGraph(graphs);
integrateJob.setStructs(structs);
job = integrateJob;
break;
case PROCESS:
GeaflowProcessJob processJob = (GeaflowProcessJob) viewToModel(view, GeaflowProcessJob.class);
GeaflowCode geaflowCode = new GeaflowCode(view.getUserCode());
processJob.setUserCode(geaflowCode);
processJob.setUserCode(view.getUserCode());
processJob.setFunctions(functions);
job = processJob;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import static com.antgroup.geaflow.console.core.service.RemoteFileService.JAR_FILE_SUFFIX;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.antgroup.geaflow.console.biz.shared.JobManager;
import com.antgroup.geaflow.console.biz.shared.RemoteFileManager;
import com.antgroup.geaflow.console.biz.shared.TaskManager;
Expand All @@ -25,7 +27,6 @@
import com.antgroup.geaflow.console.biz.shared.view.IdView;
import com.antgroup.geaflow.console.biz.shared.view.JobView;
import com.antgroup.geaflow.console.biz.shared.view.RemoteFileView;
import com.antgroup.geaflow.console.biz.shared.view.StructView;
import com.antgroup.geaflow.console.common.dal.entity.JobEntity;
import com.antgroup.geaflow.console.common.dal.model.JobSearch;
import com.antgroup.geaflow.console.common.util.ListUtil;
Expand All @@ -34,28 +35,27 @@
import com.antgroup.geaflow.console.common.util.exception.GeaflowIllegalException;
import com.antgroup.geaflow.console.common.util.type.GeaflowJobType;
import com.antgroup.geaflow.console.common.util.type.GeaflowResourceType;
import com.antgroup.geaflow.console.common.util.type.GeaflowStructType;
import com.antgroup.geaflow.console.common.util.type.GeaflowTaskType;
import com.antgroup.geaflow.console.core.model.data.GeaflowFunction;
import com.antgroup.geaflow.console.core.model.data.GeaflowGraph;
import com.antgroup.geaflow.console.core.model.data.GeaflowStruct;
import com.antgroup.geaflow.console.core.model.file.GeaflowRemoteFile;
import com.antgroup.geaflow.console.core.model.job.GeaflowJob;
import com.antgroup.geaflow.console.core.model.job.GeaflowTransferJob.StructMapping;
import com.antgroup.geaflow.console.core.service.AuthorizationService;
import com.antgroup.geaflow.console.core.service.DataService;
import com.antgroup.geaflow.console.core.service.IdService;
import com.antgroup.geaflow.console.core.service.JobService;
import com.antgroup.geaflow.console.core.service.ReleaseService;
import com.antgroup.geaflow.console.core.service.RemoteFileService;
import com.antgroup.geaflow.console.core.service.StatementService;
import com.antgroup.geaflow.console.core.service.TableService;
import com.antgroup.geaflow.console.core.service.TaskService;
import com.antgroup.geaflow.console.core.service.file.RemoteFileStorage;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -96,6 +96,9 @@ public class JobManagerImpl extends IdManagerImpl<GeaflowJob, JobView, JobSearch
@Autowired
private StatementService statementService;

@Autowired
private TableService tableService;

@Override
public IdViewConverter<GeaflowJob, JobView> getConverter() {
return jobViewConverter;
Expand All @@ -111,7 +114,6 @@ public List<GeaflowJob> parse(List<JobView> views) {
return ListUtil.convert(views, v -> {
GeaflowJobType type = v.getType();
Preconditions.checkNotNull(type, "job Type is null");

switch (type) {
case PROCESS:
// get functions
Expand All @@ -126,14 +128,12 @@ public List<GeaflowJob> parse(List<JobView> views) {
case SERVE:
List<GeaflowStruct> structs = null;
if (type == GeaflowJobType.INTEGRATE) {
// get graphs and tables
structs = getResource(v.getStructs());
// get tables
structs = getStructs(v);
}
Preconditions.checkArgument(v.getGraphs() != null && v.getGraphs().size() == 1,
"Must have one graph");

if (type == GeaflowJobType.SERVE) {
Preconditions.checkArgument(v.getGraphs().size() == 1,
"AnalysisJob job must have one graph");
}
List<String> graphIds = ListUtil.convert(v.getGraphs(), IdView::getId);
List<GeaflowGraph> graphs = ListUtil.convert(graphIds, id -> {
GeaflowGraph g = (GeaflowGraph) jobService.getResourceService(GeaflowResourceType.GRAPH).get(id);
Expand All @@ -148,20 +148,13 @@ public List<GeaflowJob> parse(List<JobView> views) {
});
}

private List<GeaflowStruct> getResource(List<StructView> views) {
if (CollectionUtils.isEmpty(views)) {
return new ArrayList<>();
}
// group by the structType
Map<GeaflowStructType, List<StructView>> group = views.stream().collect(Collectors.groupingBy(StructView::getType));
List<GeaflowStruct> res = new ArrayList<>();
// use services according to the group
for (Entry<GeaflowStructType, List<StructView>> entry : group.entrySet()) {
DataService dataService = jobService.getResourceService(GeaflowResourceType.valueOf(entry.getKey().name()));
List<String> ids = ListUtil.convert(entry.getValue(), IdView::getId);
res.addAll(dataService.get(ids));
}
return res;
private List<GeaflowStruct> getStructs(JobView jobView) {
List<StructMapping> structMappings = JSON.parseObject(jobView.getStructMappings(),
new TypeReference<List<StructMapping>>() {
});
Set<String> tableNames = structMappings.stream().map(StructMapping::getTableName).collect(Collectors.toSet());
return tableNames.stream().map(e -> tableService.getByName(jobView.getInstanceId(), e))
.collect(Collectors.toList());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import com.antgroup.geaflow.console.common.util.type.GeaflowJobType;
import java.util.List;
import java.util.Map;
import lombok.Getter;
import lombok.Setter;

Expand All @@ -32,7 +31,7 @@ public class JobView extends NameView {

private String userCode;

private Map<String, Map<String, Map<String, String>>> structMappings;
private String structMappings;

private List<StructView> structs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,4 @@ i18n.key.url=URL
i18n.key.user=User
i18n.key.username=Username
i18n.key.write.timeout=Write Timeout
i18n.key.dsl.window.size=Size of window
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,4 @@ i18n.key.url=URL
i18n.key.user=用户
i18n.key.username=用户名
i18n.key.write.timeout=写超时时间
i18n.key.dsl.window.size=????
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
USE GRAPH ${graphName};

#foreach(${insert} in ${inserts})
insert into ${graphName}(
#foreach(${strcut} in ${insert.structs})
${strcut.structName}.${strcut.structFieldName},
#end
${END_FLAG}
) select
#foreach(${strcut} in ${insert.structs})
${strcut.tableFieldName},
#end
${END_FLAG}
from ${insert.tableName};

#end
5 changes: 5 additions & 0 deletions geaflow-console/app/common/util/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 92f4eb4

Please sign in to comment.