Skip to content

Commit

Permalink
[ISSUE-199] Fix known issues with DSL and optimize LDBC test
Browse files Browse the repository at this point in the history
  • Loading branch information
Leomrlin authored Oct 17, 2023
1 parent 719ae4f commit 890ffe1
Show file tree
Hide file tree
Showing 12 changed files with 71 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public GeaFlowGraph(String instanceName, String name, List<VertexTable> vertexTa
this.name = name;
this.vertexTables = vertexTables;
this.edgeTables = edgeTables;
this.config = ImmutableMap.copyOf(config);
this.config = new HashMap<>(config);
this.usingTables = ImmutableMap.copyOf(usingTables);
this.ifNotExists = ifNotExists;
this.isTemporary = isTemporary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,14 @@ private String keyMapping(String key) {
return shortKeyMapping.getOrDefault(key, key);
}

public Map<String, String> keyMapping(Map<String, String> input) {
Map<String, String> keyMapping = new HashMap<>();
for (Map.Entry<String, String> entry : input.entrySet()) {
keyMapping.put(keyMapping(entry.getKey()), entry.getValue());
}
return keyMapping;
}

/**
* Register table to catalog.
*/
Expand Down Expand Up @@ -740,7 +748,9 @@ public void setCurrentGraph(String currentGraph) {
Table graphTable = catalog.getGraph(currentInstance, currentGraph);
if (graphTable instanceof GeaFlowGraph) {
this.currentGraph = currentGraph;
getTypeFactory().setCurrentGraph((GeaFlowGraph) graphTable);
GeaFlowGraph geaFlowGraph = (GeaFlowGraph) graphTable;
geaFlowGraph.getConfig().putAll(keyMapping(geaFlowGraph.getConfig().getConfigMap()));
getTypeFactory().setCurrentGraph(geaFlowGraph);
} else {
throw new GeaFlowDSLException("Graph: {} is not exists.", currentGraph);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,6 +56,8 @@ public abstract class AbstractVertexScanSourceFunction<K> extends RichFunction i

private long windSize;

private static final AtomicInteger storeCounter = new AtomicInteger(0);

public AbstractVertexScanSourceFunction(GraphViewDesc graphViewDesc) {
this.graphViewDesc = Objects.requireNonNull(graphViewDesc);
}
Expand All @@ -65,8 +68,11 @@ public void open(RuntimeContext runtimeContext) {
this.windSize = this.runtimeContext.getConfiguration().getLong(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE);
Configuration rewriteConfiguration = runtimeContext.getConfiguration();
String jobName = rewriteConfiguration.getString(ExecutionConfigKeys.JOB_APP_NAME);
// A read-only graph copy will be created locally for the VertexScan.
// To avoid conflicts with other VertexScans or Ops, an independent copy name is
// constructed using the job name to differentiate the storage path.
rewriteConfiguration.put(ExecutionConfigKeys.JOB_APP_NAME.getKey(),
"VertexScanSourceFunction_" + jobName);
"VertexScanSourceFunction_" + jobName + "_" + storeCounter.getAndIncrement());
GraphStateDescriptor<K, ?, ?> desc = buildGraphStateDesc();
desc.withMetricGroup(runtimeContext.getMetric());
this.graphState = StateFactory.buildGraphState(desc, runtimeContext.getConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public int getIterationCount(int currentDepth, StepOperator stepOperator) {

if (stepOperator instanceof StepLoopUntilOperator) {
StepLoopUntilOperator loopUntilOperator = (StepLoopUntilOperator)stepOperator;
currentDepth = addIteration(currentDepth, loopUntilOperator.getMaxLoopCount());
currentDepth = addIteration(currentDepth,
addIteration(loopUntilOperator.getMaxLoopCount(), 1));
}
int depth = currentDepth;
for (Object op : stepOperator.getNextOperators()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ private StepRecordWithPath selectLastLoopPath(StepRecordWithPath record, boolean
pathIndices[i] = i;
}
for (int i = 0; i < loopBodyPathFieldCount; i++) {
// When calculating the index for the loopBody fields, when
// loopCounter is 1, the first offset is used for input values. After that,
// values generated by the loop are placed starting from an offset of 1
pathIndices[i + loopStartPathFieldCount] = loopStartPathFieldCount
+ (loopCounter - 1) * loopBodyPathFieldCount + i;
+ Math.min(loopCounter - 1, 1) * loopBodyPathFieldCount + i;
}
return record.subPathSet(pathIndices);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ public static PreCompileResult preCompile(String script, Configuration config) {
SqlIdentifier insertName = gqlContext.completeCatalogObjName(
(SqlIdentifier) insert.getTargetTable());
SqlIdentifier insertGraphName = GQLNodeUtil.getGraphTableName(insertName);
String simpleGraphName = insertName.getComponent(insertName.names.size() - 1,
insertName.names.size()).getSimple();
if (createGraphs.containsKey(insertGraphName.toString())) {
SqlCreateGraph createGraph = createGraphs.get(insertGraphName.toString());
GeaFlowGraph graph = gqlContext.convertToGraph(createGraph);
preCompileResult.addGraph(SchemaUtil.buildGraphViewDesc(graph, config));
} else {
Table graph = gqlContext.getCatalog().getGraph(gqlContext.getCurrentInstance(),
insertGraphName.toString());
Table graph = gqlContext.getCatalog().getGraph(
gqlContext.getCurrentInstance(), simpleGraphName);
if (graph != null) {
GeaFlowGraph geaFlowGraph = (GeaFlowGraph) graph;
geaFlowGraph.getConfig().putAll(gqlContext.keyMapping(geaFlowGraph.getConfig().getConfigMap()));
preCompileResult.addGraph(SchemaUtil.buildGraphViewDesc(geaFlowGraph, config));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ public void testLdbcIs_06() throws Exception {
.build()
.withQueryPath("/ldbc/is_06.sql")
.withConfig(testConfig)
.withConfig(DSLConfigKeys.GEAFLOW_DSL_TRAVERSAL_SPLIT_ENABLE.getKey(), String.valueOf(true))
.execute()
.checkSinkResult();
}
Expand All @@ -551,6 +552,7 @@ public void testLdbcIs_07() throws Exception {
.build()
.withQueryPath("/ldbc/is_07.sql")
.withConfig(testConfig)
.withConfig(DSLConfigKeys.GEAFLOW_DSL_TRAVERSAL_SPLIT_ENABLE.getKey(), String.valueOf(true))
.execute()
.checkSinkResult();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
1110002,天涯,1677650127000,1100005,7
1110002,天涯,1677650127000,1100006,7
1110001,虎扑,1673848527000,1100001,6
1110002,天涯,1677650127000,1100006,9
1110002,天涯,1677650127000,1100005,9
1110001,虎扑,1673848527000,1100001,7
1110003,头条,1694238927000,1100008,2
1110004,微博,1685598927000,1100010,2
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
1100001,一,王,3,9
1100002,二,李,2,5
1100003,三,张,2,5
1100005,五,陈,2,4
1100015,十五,胡,2,4
1100004,四,刘,1,3
1100016,十六,郭,1,2
1100001,一,王,3,8
1100003,三,张,2,4
1100002,二,李,2,3
1100004,四,刘,1,2
1100005,五,陈,2,2
1100015,十五,胡,2,2
1100016,十六,郭,1,1
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
1,3
2,2
0,7
1,5
3,2
4,1
3,1
2,1
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,14 @@ CREATE TABLE tbl_result (

--GQL
INSERT INTO tbl_result
SELECT personId, firstName, lastName, threadCount, threadCount + messageCount as messageCount
SELECT personId, firstName, lastName, threadCount, messageCount as messageCount
FROM (
--2023-01-01 ~ 2023-10-01
MATCH (person:Person)<-[:hasCreator]-(post:Post where creationDate between 1672502400000 and 1696160400000)
|+| (person:Person)<-[:hasCreator]-(post:Post where creationDate between 1672502400000 and 1696160400000)
<-[:replyOf]-{1,}(comment:Comment)
WHERE comment is null or comment.creationDate between 1672502400000 and 1696160400000
<-[:replyOf]-{0,}(comment:Comment)
WHERE comment.creationDate between 1672502400000 and 1696160400000
RETURN person.id as personId, person.firstName, person.lastName,
COUNT(DISTINCT post.id) as threadCount, COUNT(1) as messageCount
COUNT(DISTINCT post) as threadCount, COUNT(comment) as messageCount
GROUP BY personId, firstName, lastName
)
ORDER BY messageCount DESC, personId LIMIT 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,27 @@ CREATE TABLE tbl_result (

INSERT INTO tbl_result
SELECT personMsgCount as messageCount, COUNT(personId) as personCount
FROM (
MATCH (post:Post)<-[:replyOf]-{0,}(msg:Post|Comment)
WHERE msg.creationDate > 1673798400000
and (msg.content is not null and length(msg.content) > 0 or msg.imageFile is not null)
and msg.length < 30
and post.lang = 'en'
MATCH (msg:Post|Comment)-[:hasCreator]->(person)
RETURN person.id as personId, COUNT(msg.id) as personMsgCount
GROUP BY personId
FROM
(
SELECT personId, MAX(personMsgCount) as personMsgCount FROM
(
(
MATCH (post:Post)<-[:replyOf]-{0,}(msg:Post|Comment)
WHERE msg.creationDate > 1673798400000
and (msg.content is not null and length(msg.content) > 0 or msg.imageFile is not null)
and msg.length < 30
and post.lang = 'en'
MATCH (msg:Post|Comment)-[:hasCreator]->(person)
RETURN person.id as personId, COUNT(msg.id) as personMsgCount
GROUP BY personId
)
UNION ALL
(
MATCH (person:Person)
RETURN person.id as personId, cast(0 as bigint) as personMsgCount
)
)
GROUP BY personId
)
GROUP BY personMsgCount
ORDER BY personCount DESC, personMsgCount DESC
Expand Down

0 comments on commit 890ffe1

Please sign in to comment.