Skip to content

Commit

Permalink
[ISSUE-179] Support converting traversal all to traversal by stream
Browse files Browse the repository at this point in the history
  • Loading branch information
Leomrlin authored Sep 26, 2023
1 parent d4cbc85 commit 3fd5f64
Show file tree
Hide file tree
Showing 8 changed files with 509 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,9 @@ public class DSLConfigKeys implements Serializable {
.key("geaflow.dsl.ignore.exception")
.defaultValue(false)
.description("If set true, dsl will skip the exception for dirty data.");

public static final ConfigKey GEAFLOW_DSL_TRAVERSAL_SPLIT_ENABLE = ConfigKeys
.key("geaflow.dsl.traversal.all.split.enable")
.defaultValue(false)
.description("Whether enable the split of the ids for traversal all. ");
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package com.antgroup.geaflow.dsl.runtime;

import com.antgroup.geaflow.api.function.io.SourceFunction;
import com.antgroup.geaflow.api.pdata.stream.window.PWindowSource;
import com.antgroup.geaflow.dsl.common.data.Row;
import com.antgroup.geaflow.dsl.runtime.expression.Expression;
import com.antgroup.geaflow.dsl.schema.GeaFlowGraph;
Expand All @@ -35,5 +37,7 @@ public interface QueryEngine {

RuntimeTable createRuntimeTable(QueryContext context, Collection<Row> rows);

<T> PWindowSource<T> createRuntimeTable(QueryContext context, SourceFunction<T> sourceFunction);

RuntimeGraph createRuntimeGraph(QueryContext context, GeaFlowGraph graph);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.antgroup.geaflow.dsl.runtime.engine;

import com.antgroup.geaflow.api.function.internal.CollectionSource;
import com.antgroup.geaflow.api.function.io.SourceFunction;
import com.antgroup.geaflow.api.pdata.stream.window.PWindowSource;
import com.antgroup.geaflow.api.window.IWindow;
import com.antgroup.geaflow.common.config.Configuration;
Expand Down Expand Up @@ -154,6 +155,13 @@ public RuntimeTable createRuntimeTable(QueryContext context, Collection<Row> row
return new GeaFlowRuntimeTable(context, pipelineContext, source);
}

@Override
public <T> PWindowSource<T> createRuntimeTable(QueryContext context, SourceFunction<T> sourceFunction) {
IWindow<T> window = Windows.createWindow(pipelineContext.getConfig());
return pipelineContext.buildSource(sourceFunction, window)
.withConfig(context.getSetOptions());
}

@SuppressWarnings("unchecked")
@Override
public RuntimeGraph createRuntimeGraph(QueryContext context, GeaFlowGraph graph) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.antgroup.geaflow.api.function.base.MapFunction;
import com.antgroup.geaflow.api.graph.PGraphWindow;
import com.antgroup.geaflow.api.graph.traversal.PGraphTraversal;
import com.antgroup.geaflow.api.pdata.stream.window.PWindowSource;
import com.antgroup.geaflow.api.pdata.stream.window.PWindowStream;
import com.antgroup.geaflow.common.config.keys.DSLConfigKeys;
import com.antgroup.geaflow.common.utils.ArrayUtil;
Expand All @@ -37,6 +38,7 @@
import com.antgroup.geaflow.dsl.runtime.QueryContext;
import com.antgroup.geaflow.dsl.runtime.RuntimeGraph;
import com.antgroup.geaflow.dsl.runtime.RuntimeTable;
import com.antgroup.geaflow.dsl.runtime.function.graph.source.DynamicGraphVertexScanSourceFunction;
import com.antgroup.geaflow.dsl.runtime.traversal.DagGroupBuilder;
import com.antgroup.geaflow.dsl.runtime.traversal.ExecuteDagGroup;
import com.antgroup.geaflow.dsl.runtime.traversal.StepLogicalPlan;
Expand Down Expand Up @@ -223,9 +225,24 @@ private PWindowStream<ITraversalResponse<ITreePath>> staticGraphTraversal(
((PGraphTraversal<Object, ITreePath>)getStaticVCTraversal(isAggTraversal,
staticGraph, executeDagGroup, maxTraversal, false, parallelism)).start(new ArrayList<>(constantStartIds));
} else { // traversal all
responsePWindow =
((PGraphTraversal<Object, ITreePath>)getStaticVCTraversal(isAggTraversal,
staticGraph, executeDagGroup, maxTraversal, false, parallelism)).start();
boolean enableTraversalAllSplit = queryContext.getGlobalConf()
.getBoolean(DSLConfigKeys.GEAFLOW_DSL_TRAVERSAL_SPLIT_ENABLE);
if (enableTraversalAllSplit) {
DynamicGraphVertexScanSourceFunction<?> sourceFunction =
new DynamicGraphVertexScanSourceFunction<>(graphViewDesc);
PWindowSource<?> source = queryContext.getEngineContext()
.createRuntimeTable(queryContext, sourceFunction)
.withParallelism(graphViewDesc.getShardNum())
.withName(queryContext.createOperatorName("VertexScanSource"));
responsePWindow =
getStaticVCTraversal(isAggTraversal,
staticGraph, executeDagGroup, maxTraversal, false, parallelism)
.start((PWindowStream) source);
} else {
responsePWindow =
((PGraphTraversal<Object, ITreePath>)getStaticVCTraversal(isAggTraversal,
staticGraph, executeDagGroup, maxTraversal, false, parallelism)).start();
}
}
return responsePWindow;
}
Expand Down Expand Up @@ -260,8 +277,22 @@ private PWindowStream<ITraversalResponse<ITreePath>> dynamicGraphTraversal(
return ((PGraphTraversal<Object, ITreePath>)getDynamicVCTraversal(isAggTraversal, dynamicGraph, executeDagGroup,
maxTraversal, false, parallelism)).start(new ArrayList<>(constantStartIds));
} else { // dynamic traversal all
return ((PGraphTraversal<Object, ITreePath>)getDynamicVCTraversal(isAggTraversal, dynamicGraph, executeDagGroup,
maxTraversal, false, parallelism)).start();
boolean enableTraversalAllSplit = queryContext.getGlobalConf()
.getBoolean(DSLConfigKeys.GEAFLOW_DSL_TRAVERSAL_SPLIT_ENABLE);
if (enableTraversalAllSplit) {
DynamicGraphVertexScanSourceFunction<?> sourceFunction =
new DynamicGraphVertexScanSourceFunction<>(graphViewDesc);
PWindowSource<?> source = queryContext.getEngineContext()
.createRuntimeTable(queryContext, sourceFunction)
.withParallelism(graphViewDesc.getShardNum())
.withName(queryContext.createOperatorName("VertexScanSource"));
return getDynamicVCTraversal(isAggTraversal, dynamicGraph, executeDagGroup,
maxTraversal, false, parallelism)
.start((PWindowStream) source);
}
return ((PGraphTraversal<Object, ITreePath>) getDynamicVCTraversal(isAggTraversal, dynamicGraph,
executeDagGroup, maxTraversal, false, parallelism)).start();

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.dsl.runtime.function.graph.source;

import com.antgroup.geaflow.api.context.RuntimeContext;
import com.antgroup.geaflow.api.function.RichFunction;
import com.antgroup.geaflow.api.function.io.SourceFunction;
import com.antgroup.geaflow.api.window.IWindow;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.DSLConfigKeys;
import com.antgroup.geaflow.common.config.keys.ExecutionConfigKeys;
import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
import com.antgroup.geaflow.dsl.runtime.traversal.data.IdOnlyRequest;
import com.antgroup.geaflow.state.GraphState;
import com.antgroup.geaflow.state.StateFactory;
import com.antgroup.geaflow.state.descriptor.GraphStateDescriptor;
import com.antgroup.geaflow.utils.keygroup.IKeyGroupAssigner;
import com.antgroup.geaflow.utils.keygroup.KeyGroup;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignerFactory;
import com.antgroup.geaflow.utils.keygroup.KeyGroupAssignment;
import com.antgroup.geaflow.view.IViewDesc.BackendType;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import com.antgroup.geaflow.view.meta.ViewMetaBookKeeper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractVertexScanSourceFunction<K> extends RichFunction implements
SourceFunction<IdOnlyRequest> {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractVertexScanSourceFunction.class);

protected transient RuntimeContext runtimeContext;

protected GraphViewDesc graphViewDesc;

protected transient GraphState<K, ?, ?> graphState;

private Iterator<K> idIterator;

private long windSize;

public AbstractVertexScanSourceFunction(GraphViewDesc graphViewDesc) {
this.graphViewDesc = Objects.requireNonNull(graphViewDesc);
}

@Override
public void open(RuntimeContext runtimeContext) {
this.runtimeContext = runtimeContext;
this.windSize = this.runtimeContext.getConfiguration().getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE);
Configuration rewriteConfiguration = runtimeContext.getConfiguration();
String jobName = rewriteConfiguration.getString(ExecutionConfigKeys.JOB_APP_NAME);
rewriteConfiguration.put(ExecutionConfigKeys.JOB_APP_NAME.getKey(),
"VertexScanSourceFunction_" + jobName);
GraphStateDescriptor<K, ?, ?> desc = buildGraphStateDesc();
desc.withMetricGroup(runtimeContext.getMetric());
this.graphState = StateFactory.buildGraphState(desc, runtimeContext.getConfiguration());
recover();
this.idIterator = buildIdIterator();
}

protected abstract Iterator<K> buildIdIterator();

protected void recover() {
LOGGER.info("Task: {} will do recover, windowId: {}",
this.runtimeContext.getTaskArgs().getTaskId(), this.runtimeContext.getWindowId());
long lastCheckPointId = getLatestViewVersion();
if (lastCheckPointId >= 0) {
LOGGER.info("Task: {} do recover to state VersionId: {}", this.runtimeContext.getTaskArgs().getTaskId(),
lastCheckPointId);
graphState.manage().operate().setCheckpointId(lastCheckPointId);
graphState.manage().operate().recover();
}
}

@Override
public void init(int parallel, int index) {

}

protected GraphStateDescriptor<K, ?, ?> buildGraphStateDesc() {
int taskIndex = runtimeContext.getTaskArgs().getTaskIndex();
int taskPara = runtimeContext.getTaskArgs().getParallelism();
BackendType backendType = graphViewDesc.getBackend();
GraphStateDescriptor<K, ?, ?> desc = GraphStateDescriptor.build(graphViewDesc.getName()
, backendType.name());

int maxPara = graphViewDesc.getShardNum();
Preconditions.checkArgument(taskPara <= maxPara,
String.format("task parallelism '%s' must be <= shard num(max parallelism) '%s'",
taskPara, maxPara));

KeyGroup keyGroup = KeyGroupAssignment.computeKeyGroupRangeForOperatorIndex(maxPara, taskPara, taskIndex);
IKeyGroupAssigner keyGroupAssigner =
KeyGroupAssignerFactory.createKeyGroupAssigner(keyGroup, taskIndex, maxPara);
desc.withKeyGroup(keyGroup);
desc.withKeyGroupAssigner(keyGroupAssigner);

long taskId = runtimeContext.getTaskArgs().getTaskId();
int containerNum = runtimeContext.getConfiguration().getInteger(ExecutionConfigKeys.CONTAINER_NUM);
LOGGER.info("Task:{} taskId:{} taskIndex:{} keyGroup:{} containerNum:{} real taskIndex:{}",
this.runtimeContext.getTaskArgs().getTaskName(),
taskId,
taskIndex,
desc.getKeyGroup(), containerNum, runtimeContext.getTaskArgs().getTaskIndex());
return desc;
}

protected long getLatestViewVersion() {
long lastCheckPointId;
try {
ViewMetaBookKeeper keeper = new ViewMetaBookKeeper(graphViewDesc.getName(),
this.runtimeContext.getConfiguration());
lastCheckPointId = keeper.getLatestViewVersion(graphViewDesc.getName());
LOGGER.info("Task: {} will do recover or load, ViewMetaBookKeeper version: {}",
runtimeContext.getTaskArgs().getTaskId(), lastCheckPointId);
} catch (IOException e) {
throw new GeaflowRuntimeException(e);
}
return lastCheckPointId;
}

@Override
public boolean fetch(IWindow<IdOnlyRequest> window, SourceContext<IdOnlyRequest> ctx) throws Exception {
int count = 0;
while (idIterator.hasNext()) {
K id = idIterator.next();
IdOnlyRequest idOnlyRequest = new IdOnlyRequest(id);
ctx.collect(idOnlyRequest);
count++;
if (count == windSize) {
break;
}
}
return count == windSize;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.dsl.runtime.function.graph.source;

import com.antgroup.geaflow.model.graph.meta.GraphMeta;
import com.antgroup.geaflow.state.DataModel;
import com.antgroup.geaflow.state.descriptor.GraphStateDescriptor;
import com.antgroup.geaflow.state.graph.StateMode;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import java.util.Iterator;

public class DynamicGraphVertexScanSourceFunction<K> extends AbstractVertexScanSourceFunction<K> {

public DynamicGraphVertexScanSourceFunction(GraphViewDesc graphViewDesc) {
super(graphViewDesc);
}

@Override
protected Iterator<K> buildIdIterator() {
return graphState.dynamicGraph().V().idIterator();
}

@Override
protected GraphStateDescriptor<K, ?, ?> buildGraphStateDesc() {
GraphStateDescriptor<K, ?, ?> desc = super.buildGraphStateDesc();
desc.withDataModel(DataModel.DYNAMIC_GRAPH);
desc.withStateMode(StateMode.RW);
desc.withGraphMeta(new GraphMeta(graphViewDesc.getGraphMetaType()));
return desc;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.dsl.runtime.function.graph.source;

import com.antgroup.geaflow.model.graph.meta.GraphMeta;
import com.antgroup.geaflow.state.DataModel;
import com.antgroup.geaflow.state.descriptor.GraphStateDescriptor;
import com.antgroup.geaflow.state.graph.StateMode;
import com.antgroup.geaflow.view.graph.GraphViewDesc;
import java.util.Iterator;

public class StaticGraphVertexScanSourceFunction<K> extends AbstractVertexScanSourceFunction<K> {

public StaticGraphVertexScanSourceFunction(GraphViewDesc graphViewDesc) {
super(graphViewDesc);
}

@Override
protected Iterator<K> buildIdIterator() {
return graphState.staticGraph().V().idIterator();
}

protected GraphStateDescriptor<K, ?, ?> buildGraphStateDesc() {
GraphStateDescriptor<K, ?, ?> desc = super.buildGraphStateDesc();
desc.withDataModel(DataModel.STATIC_GRAPH);
desc.withStateMode(StateMode.RW);
desc.withGraphMeta(new GraphMeta(graphViewDesc.getGraphMetaType()));
return desc;
}
}
Loading

0 comments on commit 3fd5f64

Please sign in to comment.