diff --git a/connector/pom.xml b/connector/pom.xml
index beed8da..76dc002 100644
--- a/connector/pom.xml
+++ b/connector/pom.xml
@@ -5,7 +5,7 @@
nebula-flink
com.vesoft
- 3.0-SNAPSHOT
+ 3.8.0
../pom.xml
4.0.0
@@ -13,7 +13,7 @@
nebula-flink-connector
- 3.0-SNAPSHOT
+ 3.8.0
1.14.4
2.11
1.8
diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java
index 1e3de01..6724618 100644
--- a/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java
+++ b/connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java
@@ -62,7 +62,8 @@ public void executeBatch(Session session) throws IOException {
return;
}
NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(),
- executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy());
+ executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy(),
+ executionOptions.isDeleteExecutedWithEdges());
// generate the write ngql statement
String statement = null;
switch (executionOptions.getWriteMode()) {
diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java
index 5e7057b..1a66f57 100644
--- a/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java
+++ b/connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java
@@ -32,6 +32,8 @@ public class VertexExecutionOptions extends ExecutionOptions {
*/
private int idIndex;
+ private boolean isDeleteExecutedWithEdges = false;
+
public VertexExecutionOptions(String graphSpace,
String executeStatement,
List fields,
@@ -44,6 +46,7 @@ public VertexExecutionOptions(String graphSpace,
PolicyEnum policy,
WriteModeEnum mode,
String tag,
+ boolean isDeleteExecutedWithEdges,
int idIndex,
int batchIntervalMs,
FailureHandlerEnum failureHandler,
@@ -54,6 +57,7 @@ public VertexExecutionOptions(String graphSpace,
failureHandler, maxRetries, retryDelayMs);
this.tag = tag;
this.idIndex = idIndex;
+ this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
}
public int getIdIndex() {
@@ -65,6 +69,10 @@ public String getLabel() {
return tag;
}
+ public boolean isDeleteExecutedWithEdges() {
+ return isDeleteExecutedWithEdges;
+ }
+
@Override
public DataTypeEnum getDataType() {
return DataTypeEnum.VERTEX;
@@ -78,6 +86,7 @@ public ExecutionOptionBuilder toBuilder() {
.setFields(this.getFields())
.setPositions(this.getPositions())
.setNoColumn(this.isNoColumn())
+ .setDeleteExecutedWithEdges(this.isDeleteExecutedWithEdges())
.setLimit(this.getLimit())
.setStartTime(this.getStartTime())
.setEndTime(this.getEndTime())
@@ -99,6 +108,7 @@ public static class ExecutionOptionBuilder {
private List fields;
private List positions;
private boolean noColumn = false;
+ private boolean isDeleteExecutedWithEdges = false;
private int limit = DEFAULT_SCAN_LIMIT;
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
@@ -144,6 +154,13 @@ public ExecutionOptionBuilder setNoColumn(boolean noColumn) {
return this;
}
+ public ExecutionOptionBuilder setDeleteExecutedWithEdges(
+ boolean isDeleteExecutedWithEdges
+ ) {
+ this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
+ return this;
+ }
+
public ExecutionOptionBuilder setLimit(int limit) {
this.limit = limit;
return this;
@@ -220,7 +237,8 @@ public VertexExecutionOptions build() {
}
return new VertexExecutionOptions(graphSpace, executeStatement, fields,
positions, noColumn, limit, startTime, endTime, batchSize, policy, mode, tag,
- idIndex, batchIntervalMs, failureHandler, maxRetries, retryDelayMs);
+ isDeleteExecutedWithEdges, idIndex, batchIntervalMs,
+ failureHandler, maxRetries, retryDelayMs);
}
}
}
diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java
index 53541c7..601e2e4 100644
--- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java
+++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java
@@ -21,6 +21,7 @@ public class NebulaConstant {
// template for delete statement
public static String DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s";
+ public static String DELETE_VERTEX_TEMPLATE_WITH_EDGE = "DELETE VERTEX %s WITH EDGE";
public static String DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s";
public static String EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d";
diff --git a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java
index 381d028..d459825 100644
--- a/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java
+++ b/connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java
@@ -7,6 +7,7 @@
import static org.apache.flink.connector.nebula.utils.NebulaConstant.BATCH_INSERT_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE;
+import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE_WITH_EDGE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.ENDPOINT_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VALUE_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VERTEX_TEMPLATE;
@@ -23,6 +24,8 @@ public class NebulaVertices implements Serializable {
private List vertices;
private PolicyEnum policy = null;
+ private boolean isDeleteExecutedWithEdges;
+
public NebulaVertices(String tagName, List propNames, List vertices,
PolicyEnum policy) {
this.tagName = tagName;
@@ -31,6 +34,15 @@ public NebulaVertices(String tagName, List propNames, List
this.policy = policy;
}
+ public NebulaVertices(String tagName, List propNames, List vertices,
+ PolicyEnum policy, boolean isDeleteExecutedWithEdges) {
+ this.tagName = tagName;
+ this.propNames = propNames;
+ this.vertices = vertices;
+ this.policy = policy;
+ this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
+ }
+
public String getPropNames() {
List escapePropNames = new ArrayList<>();
for (String propName : propNames) {
@@ -110,7 +122,10 @@ public String getDeleteStatement() {
String vertexId = getVertexId(vertex);
vertexIds.add(vertexId);
}
- return String.format(DELETE_VERTEX_TEMPLATE, String.join(",", vertexIds));
+ String template = isDeleteExecutedWithEdges
+ ? DELETE_VERTEX_TEMPLATE_WITH_EDGE
+ : DELETE_VERTEX_TEMPLATE;
+ return String.format(template, String.join(",", vertexIds));
}
/**
diff --git a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java
index 66cc877..d77d2a0 100644
--- a/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java
+++ b/connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java
@@ -112,4 +112,26 @@ public void testGetDeleteStatementWithPolicy() {
assert (vertexStatement.equals(expectStatement));
}
+ public void testGetDeleteStatementWithEdges() {
+ vertices.add(new NebulaVertex("\"vid1\"", props1));
+ vertices.add(new NebulaVertex("\"vid2\"", props2));
+
+ NebulaVertices nebulaVertices = new NebulaVertices(
+ tagName, propNames, vertices, null, true
+ );
+ String vertexStatement = nebulaVertices.getDeleteStatement();
+ String expectStatement = "DELETE VERTEX \"vid1\",\"vid2\" WITH EDGE";
+ assert (vertexStatement.equals(expectStatement));
+ }
+
+ public void testGetDeleteStatementWithPolicyAndEdges() {
+ vertices.add(new NebulaVertex("vid1", props1));
+ vertices.add(new NebulaVertex("vid2", props2));
+
+ NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices,
+ PolicyEnum.HASH, true);
+ String vertexStatement = nebulaVertices.getDeleteStatement();
+ String expectStatement = "DELETE VERTEX HASH(\"vid1\"),HASH(\"vid2\") WITH EDGE";
+ assert (vertexStatement.equals(expectStatement));
+ }
}
diff --git a/example/pom.xml b/example/pom.xml
index 00a2ac5..635d746 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -5,7 +5,7 @@
nebula-flink
com.vesoft
- 3.0-SNAPSHOT
+ 3.8.0
../pom.xml
4.0.0
diff --git a/pom.xml b/pom.xml
index d06a60d..f415c5b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
com.vesoft
nebula-flink
pom
- 3.0-SNAPSHOT
+ 3.8.0
connector