diff --git a/connector/pom.xml b/connector/pom.xml index beed8da..76dc002 100644 --- a/connector/pom.xml +++ b/connector/pom.xml @@ -5,7 +5,7 @@ nebula-flink com.vesoft - 3.0-SNAPSHOT + 3.8.0 ../pom.xml 4.0.0 @@ -13,7 +13,7 @@ nebula-flink-connector - 3.0-SNAPSHOT + 3.8.0 1.14.4 2.11 1.8 diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java index 1e3de01..6724618 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java @@ -62,7 +62,8 @@ public void executeBatch(Session session) throws IOException { return; } NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(), - executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy()); + executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy(), + executionOptions.isDeleteExecutedWithEdges()); // generate the write ngql statement String statement = null; switch (executionOptions.getWriteMode()) { diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java index 5e7057b..1a66f57 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java @@ -32,6 +32,8 @@ public class VertexExecutionOptions extends ExecutionOptions { */ private int idIndex; + private boolean isDeleteExecutedWithEdges = false; + public VertexExecutionOptions(String graphSpace, String executeStatement, List fields, @@ -44,6 +46,7 @@ public VertexExecutionOptions(String graphSpace, PolicyEnum policy, WriteModeEnum mode, String tag, + boolean isDeleteExecutedWithEdges, int idIndex, int batchIntervalMs, FailureHandlerEnum failureHandler, @@ -54,6 +57,7 @@ public VertexExecutionOptions(String graphSpace, failureHandler, maxRetries, retryDelayMs); this.tag = tag; this.idIndex = idIndex; + this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges; } public int getIdIndex() { @@ -65,6 +69,10 @@ public String getLabel() { return tag; } + public boolean isDeleteExecutedWithEdges() { + return isDeleteExecutedWithEdges; + } + @Override public DataTypeEnum getDataType() { return DataTypeEnum.VERTEX; @@ -78,6 +86,7 @@ public ExecutionOptionBuilder toBuilder() { .setFields(this.getFields()) .setPositions(this.getPositions()) .setNoColumn(this.isNoColumn()) + .setDeleteExecutedWithEdges(this.isDeleteExecutedWithEdges()) .setLimit(this.getLimit()) .setStartTime(this.getStartTime()) .setEndTime(this.getEndTime()) @@ -99,6 +108,7 @@ public static class ExecutionOptionBuilder { private List fields; private List positions; private boolean noColumn = false; + private boolean isDeleteExecutedWithEdges = false; private int limit = DEFAULT_SCAN_LIMIT; private long startTime = 0; private long endTime = Long.MAX_VALUE; @@ -144,6 +154,13 @@ public ExecutionOptionBuilder setNoColumn(boolean noColumn) { return this; } + public ExecutionOptionBuilder setDeleteExecutedWithEdges( + boolean isDeleteExecutedWithEdges + ) { + this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges; + return this; + } + public ExecutionOptionBuilder setLimit(int limit) { this.limit = limit; return this; @@ -220,7 +237,8 @@ public VertexExecutionOptions build() { } return new VertexExecutionOptions(graphSpace, executeStatement, fields, positions, noColumn, limit, startTime, endTime, batchSize, policy, mode, tag, - idIndex, batchIntervalMs, failureHandler, maxRetries, retryDelayMs); + isDeleteExecutedWithEdges, idIndex, batchIntervalMs, + failureHandler, maxRetries, retryDelayMs); } } } diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java index 53541c7..601e2e4 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java @@ -21,6 +21,7 @@ public class NebulaConstant { // template for delete statement public static String DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s"; + public static String DELETE_VERTEX_TEMPLATE_WITH_EDGE = "DELETE VERTEX %s WITH EDGE"; public static String DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s"; public static String EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d"; diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java index 381d028..d459825 100644 --- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java +++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java @@ -7,6 +7,7 @@ import static org.apache.flink.connector.nebula.utils.NebulaConstant.BATCH_INSERT_TEMPLATE; import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE; +import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE_WITH_EDGE; import static org.apache.flink.connector.nebula.utils.NebulaConstant.ENDPOINT_TEMPLATE; import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VALUE_TEMPLATE; import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VERTEX_TEMPLATE; @@ -23,6 +24,8 @@ public class NebulaVertices implements Serializable { private List vertices; private PolicyEnum policy = null; + private boolean isDeleteExecutedWithEdges; + public NebulaVertices(String tagName, List propNames, List vertices, PolicyEnum policy) { this.tagName = tagName; @@ -31,6 +34,15 @@ public NebulaVertices(String tagName, List propNames, List this.policy = policy; } + public NebulaVertices(String tagName, List propNames, List vertices, + PolicyEnum policy, boolean isDeleteExecutedWithEdges) { + this.tagName = tagName; + this.propNames = propNames; + this.vertices = vertices; + this.policy = policy; + this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges; + } + public String getPropNames() { List escapePropNames = new ArrayList<>(); for (String propName : propNames) { @@ -110,7 +122,10 @@ public String getDeleteStatement() { String vertexId = getVertexId(vertex); vertexIds.add(vertexId); } - return String.format(DELETE_VERTEX_TEMPLATE, String.join(",", vertexIds)); + String template = isDeleteExecutedWithEdges + ? DELETE_VERTEX_TEMPLATE_WITH_EDGE + : DELETE_VERTEX_TEMPLATE; + return String.format(template, String.join(",", vertexIds)); } /** diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java index 66cc877..d77d2a0 100644 --- a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java +++ b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java @@ -112,4 +112,26 @@ public void testGetDeleteStatementWithPolicy() { assert (vertexStatement.equals(expectStatement)); } + public void testGetDeleteStatementWithEdges() { + vertices.add(new NebulaVertex("\"vid1\"", props1)); + vertices.add(new NebulaVertex("\"vid2\"", props2)); + + NebulaVertices nebulaVertices = new NebulaVertices( + tagName, propNames, vertices, null, true + ); + String vertexStatement = nebulaVertices.getDeleteStatement(); + String expectStatement = "DELETE VERTEX \"vid1\",\"vid2\" WITH EDGE"; + assert (vertexStatement.equals(expectStatement)); + } + + public void testGetDeleteStatementWithPolicyAndEdges() { + vertices.add(new NebulaVertex("vid1", props1)); + vertices.add(new NebulaVertex("vid2", props2)); + + NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices, + PolicyEnum.HASH, true); + String vertexStatement = nebulaVertices.getDeleteStatement(); + String expectStatement = "DELETE VERTEX HASH(\"vid1\"),HASH(\"vid2\") WITH EDGE"; + assert (vertexStatement.equals(expectStatement)); + } } diff --git a/example/pom.xml b/example/pom.xml index 00a2ac5..635d746 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ nebula-flink com.vesoft - 3.0-SNAPSHOT + 3.8.0 ../pom.xml 4.0.0 diff --git a/pom.xml b/pom.xml index d06a60d..f415c5b 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ com.vesoft nebula-flink pom - 3.0-SNAPSHOT + 3.8.0 connector