Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add vertex delete with edge #103

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
<parent>
<artifactId>nebula-flink</artifactId>
<groupId>com.vesoft</groupId>
<version>3.0-SNAPSHOT</version>
<version>3.8.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>nebula-flink-connector</artifactId>

<properties>
<nebula.version>3.0-SNAPSHOT</nebula.version>
<nebula.version>3.8.0</nebula.version>
<flink.version>1.14.4</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<compiler.source.version>1.8</compiler.source.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public void executeBatch(Session session) throws IOException {
return;
}
NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(),
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy());
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy(),
executionOptions.isDeleteExecutedWithEdges());
// generate the write ngql statement
String statement = null;
switch (executionOptions.getWriteMode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class VertexExecutionOptions extends ExecutionOptions {
*/
private int idIndex;

private boolean isDeleteExecutedWithEdges = false;

public VertexExecutionOptions(String graphSpace,
String executeStatement,
List<String> fields,
Expand All @@ -44,6 +46,7 @@ public VertexExecutionOptions(String graphSpace,
PolicyEnum policy,
WriteModeEnum mode,
String tag,
boolean isDeleteExecutedWithEdges,
int idIndex,
int batchIntervalMs,
FailureHandlerEnum failureHandler,
Expand All @@ -54,6 +57,7 @@ public VertexExecutionOptions(String graphSpace,
failureHandler, maxRetries, retryDelayMs);
this.tag = tag;
this.idIndex = idIndex;
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
}

public int getIdIndex() {
Expand All @@ -65,6 +69,10 @@ public String getLabel() {
return tag;
}

public boolean isDeleteExecutedWithEdges() {
return isDeleteExecutedWithEdges;
}

@Override
public DataTypeEnum getDataType() {
return DataTypeEnum.VERTEX;
Expand All @@ -78,6 +86,7 @@ public ExecutionOptionBuilder toBuilder() {
.setFields(this.getFields())
.setPositions(this.getPositions())
.setNoColumn(this.isNoColumn())
.setDeleteExecutedWithEdges(this.isDeleteExecutedWithEdges())
.setLimit(this.getLimit())
.setStartTime(this.getStartTime())
.setEndTime(this.getEndTime())
Expand All @@ -99,6 +108,7 @@ public static class ExecutionOptionBuilder {
private List<String> fields;
private List<Integer> positions;
private boolean noColumn = false;
private boolean isDeleteExecutedWithEdges = false;
private int limit = DEFAULT_SCAN_LIMIT;
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
Expand Down Expand Up @@ -144,6 +154,13 @@ public ExecutionOptionBuilder setNoColumn(boolean noColumn) {
return this;
}

public ExecutionOptionBuilder setDeleteExecutedWithEdges(
boolean isDeleteExecutedWithEdges
) {
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
return this;
}

public ExecutionOptionBuilder setLimit(int limit) {
this.limit = limit;
return this;
Expand Down Expand Up @@ -220,7 +237,8 @@ public VertexExecutionOptions build() {
}
return new VertexExecutionOptions(graphSpace, executeStatement, fields,
positions, noColumn, limit, startTime, endTime, batchSize, policy, mode, tag,
idIndex, batchIntervalMs, failureHandler, maxRetries, retryDelayMs);
isDeleteExecutedWithEdges, idIndex, batchIntervalMs,
failureHandler, maxRetries, retryDelayMs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class NebulaConstant {

// template for delete statement
public static String DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s";
public static String DELETE_VERTEX_TEMPLATE_WITH_EDGE = "DELETE VERTEX %s WITH EDGE";
public static String DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s";
public static String EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static org.apache.flink.connector.nebula.utils.NebulaConstant.BATCH_INSERT_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE_WITH_EDGE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.ENDPOINT_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VALUE_TEMPLATE;
import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VERTEX_TEMPLATE;
Expand All @@ -23,6 +24,8 @@ public class NebulaVertices implements Serializable {
private List<NebulaVertex> vertices;
private PolicyEnum policy = null;

private boolean isDeleteExecutedWithEdges;

public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex> vertices,
PolicyEnum policy) {
this.tagName = tagName;
Expand All @@ -31,6 +34,15 @@ public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex>
this.policy = policy;
}

public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex> vertices,
PolicyEnum policy, boolean isDeleteExecutedWithEdges) {
this.tagName = tagName;
this.propNames = propNames;
this.vertices = vertices;
this.policy = policy;
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
}

public String getPropNames() {
List<String> escapePropNames = new ArrayList<>();
for (String propName : propNames) {
Expand Down Expand Up @@ -110,7 +122,10 @@ public String getDeleteStatement() {
String vertexId = getVertexId(vertex);
vertexIds.add(vertexId);
}
return String.format(DELETE_VERTEX_TEMPLATE, String.join(",", vertexIds));
String template = isDeleteExecutedWithEdges
? DELETE_VERTEX_TEMPLATE_WITH_EDGE
: DELETE_VERTEX_TEMPLATE;
return String.format(template, String.join(",", vertexIds));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,26 @@ public void testGetDeleteStatementWithPolicy() {
assert (vertexStatement.equals(expectStatement));
}

public void testGetDeleteStatementWithEdges() {
vertices.add(new NebulaVertex("\"vid1\"", props1));
vertices.add(new NebulaVertex("\"vid2\"", props2));

NebulaVertices nebulaVertices = new NebulaVertices(
tagName, propNames, vertices, null, true
);
String vertexStatement = nebulaVertices.getDeleteStatement();
String expectStatement = "DELETE VERTEX \"vid1\",\"vid2\" WITH EDGE";
assert (vertexStatement.equals(expectStatement));
}

public void testGetDeleteStatementWithPolicyAndEdges() {
vertices.add(new NebulaVertex("vid1", props1));
vertices.add(new NebulaVertex("vid2", props2));

NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices,
PolicyEnum.HASH, true);
String vertexStatement = nebulaVertices.getDeleteStatement();
String expectStatement = "DELETE VERTEX HASH(\"vid1\"),HASH(\"vid2\") WITH EDGE";
assert (vertexStatement.equals(expectStatement));
}
}
2 changes: 1 addition & 1 deletion example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>nebula-flink</artifactId>
<groupId>com.vesoft</groupId>
<version>3.0-SNAPSHOT</version>
<version>3.8.0</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.vesoft</groupId>
<artifactId>nebula-flink</artifactId>
<packaging>pom</packaging>
<version>3.0-SNAPSHOT</version>
<version>3.8.0</version>

<modules>
<module>connector</module>
Expand Down
Loading