Skip to content

Commit

Permalink
[ISSUE-214] Support different names for vertex and edge meta fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Leomrlin authored Oct 27, 2023
1 parent 8b4204a commit 68d227f
Show file tree
Hide file tree
Showing 21 changed files with 498 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ public void validate() {
if (this.vertexTables.size() > 0) {
TableField commonVertexIdField = this.vertexTables.get(0).getIdField();
for (VertexTable vertexTable : this.vertexTables) {
if (!vertexTable.getIdFieldName().equals(commonVertexIdField.getName())) {
throw new GeaFlowDSLException("Id field name should be same between vertex " + "tables");
} else if (!vertexTable.getIdField().getType().equals(commonVertexIdField.getType())) {
if (!vertexTable.getIdField().getType().equals(commonVertexIdField.getType())) {
throw new GeaFlowDSLException("Id field type should be same between vertex " + "tables");
}
}
Expand All @@ -103,15 +101,9 @@ public void validate() {
Optional<TableField> commonTsField =
Optional.ofNullable(this.edgeTables.get(0).getTimestampField());
for (EdgeTable edgeTable : this.edgeTables) {
if (!edgeTable.getSrcIdFieldName().equals(commonSrcIdField.getName())) {
throw new GeaFlowDSLException("SOURCE ID field name should be same between "
+ "edge tables");
} else if (!edgeTable.getSrcIdField().getType().equals(commonSrcIdField.getType())) {
if (!edgeTable.getSrcIdField().getType().equals(commonSrcIdField.getType())) {
throw new GeaFlowDSLException("SOURCE ID field type should be same between edge "
+ "tables");
} else if (!edgeTable.getTargetIdFieldName().equals(commonTargetIdField.getName())) {
throw new GeaFlowDSLException("DESTINATION ID field name should be same "
+ "between edge tables");
} else if (!edgeTable.getTargetIdField().getType().equals(commonTargetIdField.getType())) {
throw new GeaFlowDSLException("DESTINATION ID field type should be same "
+ "between edge tables");
Expand All @@ -120,9 +112,6 @@ public void validate() {
if (commonTsField.isPresent()) {
if (edgeTable.getTimestampField() == null) {
throw new GeaFlowDSLException("TIMESTAMP should defined or not defined in all edge tables");
} else if (!edgeTable.getTimestampFieldName().equals(commonTsField.get().getName())) {
throw new GeaFlowDSLException("TIMESTAMP field name should be same between "
+ "edge tables");
} else if (!edgeTable.getTimestampField().getType().equals(commonTsField.get().getType())) {
throw new GeaFlowDSLException("TIMESTAMP field type should be same between edge "
+ "tables");
Expand Down Expand Up @@ -307,6 +296,9 @@ public VertexTable(String instanceName, String typeName, List<TableField> fields
}

private void checkFields() {
for (TableField field : getFields()) {
GraphRecordType.validateFieldName(field.getName());
}
Set<String> fieldNames = getFields().stream().map(TableField::getName)
.collect(Collectors.toSet());
if (fieldNames.size() != super.getFields().size()) {
Expand Down Expand Up @@ -378,6 +370,9 @@ public EdgeTable(String instanceName, String typeName, List<TableField> fields,
}

private void checkFields() {
for (TableField field : getFields()) {
GraphRecordType.validateFieldName(field.getName());
}
Set<String> fieldNames = getFields().stream().map(TableField::getName)
.collect(Collectors.toSet());
if (fieldNames.size() != getFields().size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public GeaFlowDSLException(String message) {
super(message);
}

public GeaFlowDSLException(Throwable e, String message, Object... parameters) {
super(MessageFormatter.arrayFormat(message, parameters).getMessage(), e);
}

public GeaFlowDSLException(SqlParserPos position, String message, Object... parameters) {
super("At " + position + ": " + MessageFormatter.arrayFormat(message, parameters).getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class EdgeType extends StructType {

public static final String DEFAULT_LABEL_NAME = "~label";

public static final String DEFAULT_TS_NAME = "~ts";

public EdgeType(List<TableField> fields, boolean hasTimestamp) {
super(fields);
this.hasTimestamp = hasTimestamp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ public class VertexType extends StructType {

private static final int NUM_META_FIELDS = 2;

public static final String DEFAULT_ID_FIELD_NAME = "~id";

public VertexType(List<TableField> fields) {
super(fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,31 @@ public boolean isNullable() {
public static EdgeRecordType createEdgeType(List<RelDataTypeField> fields, String srcIdField,
String targetIdField, String timestampField,
RelDataTypeFactory typeFactory) {

boolean hasMultiSrcIdFields = fields.stream()
.filter(f -> f.getType() instanceof MetaFieldType
&& ((MetaFieldType) f.getType()).getMetaField().equals(MetaField.EDGE_SRC_ID))
.count() > 1;
if (hasMultiSrcIdFields) {
srcIdField = EdgeType.DEFAULT_SRC_ID_NAME;
fields = GraphRecordType.renameMetaField(fields, MetaField.EDGE_SRC_ID, srcIdField);
}
boolean hasMultiTargetIdFields = fields.stream()
.filter(f -> f.getType() instanceof MetaFieldType
&& ((MetaFieldType) f.getType()).getMetaField().equals(MetaField.EDGE_TARGET_ID))
.count() > 1;
if (hasMultiTargetIdFields) {
targetIdField = EdgeType.DEFAULT_TARGET_ID_NAME;
fields = GraphRecordType.renameMetaField(fields, MetaField.EDGE_TARGET_ID, targetIdField);
}
boolean hasMultiTsFields = fields.stream()
.filter(f -> f.getType() instanceof MetaFieldType
&& ((MetaFieldType) f.getType()).getMetaField().equals(MetaField.EDGE_TS))
.count() > 1;
if (hasMultiTsFields) {
timestampField = EdgeType.DEFAULT_TS_NAME;
fields = GraphRecordType.renameMetaField(fields, MetaField.EDGE_TS, timestampField);
}
List<RelDataTypeField> reorderFields = reorderFields(fields, srcIdField, targetIdField, timestampField,
typeFactory);
return new EdgeRecordType(reorderFields, timestampField != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@

package com.antgroup.geaflow.dsl.calcite;

import com.antgroup.geaflow.dsl.calcite.MetaFieldType.MetaField;
import com.antgroup.geaflow.dsl.common.exception.GeaFlowDSLException;
import com.antgroup.geaflow.dsl.common.types.EdgeType;
import com.antgroup.geaflow.dsl.common.types.VertexType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
Expand All @@ -31,8 +38,24 @@

public class GraphRecordType extends RelRecordType {

private static final Set<String> META_FIELD_NAMES = new HashSet<String>() {
{
add(VertexType.DEFAULT_ID_FIELD_NAME.toUpperCase(Locale.ROOT));
add(EdgeType.DEFAULT_SRC_ID_NAME.toUpperCase(Locale.ROOT));
add(EdgeType.DEFAULT_TARGET_ID_NAME.toUpperCase(Locale.ROOT));
add(EdgeType.DEFAULT_LABEL_NAME.toUpperCase(Locale.ROOT));
add(EdgeType.DEFAULT_TS_NAME.toUpperCase(Locale.ROOT));
}
};

private final String graphName;

public static void validateFieldName(String name) {
if (META_FIELD_NAMES.contains(name.toUpperCase(Locale.ROOT))) {
throw new GeaFlowDSLException("Field {} cannot use in graph as field name.", name);
}
}

public GraphRecordType(String graphName, List<RelDataTypeField> fields) {
super(StructKind.PEEK_FIELDS, fields);
this.graphName = Objects.requireNonNull(graphName);
Expand Down Expand Up @@ -103,10 +126,6 @@ public VertexRecordType getVertexType(Collection<String> vertexTypes, RelDataTyp
chkVertexId = vertexType.getIdField().getName();
chkVertexIdType = vertexType.getIdField().getType();
} else {
if (!vertexType.getIdField().getName().equals(chkVertexId)) {
throw new GeaFlowDSLException("Id field name should be same between vertex "
+ "tables");
}
if (!vertexType.getIdField().getType().equals(chkVertexIdType)) {
throw new GeaFlowDSLException("Id field type should be same between vertex "
+ "tables");
Expand Down Expand Up @@ -137,6 +156,7 @@ public VertexRecordType getVertexType(Collection<String> vertexTypes, RelDataTyp
}
}
}

return VertexRecordType.createVertexType(combineFields, idField, typeFactory);
}

Expand Down Expand Up @@ -179,10 +199,6 @@ public EdgeRecordType getEdgeType(Collection<String> edgeTypes, RelDataTypeFacto
chkSourceId = edgeType.getSrcIdField().getName();
chkSourceIdType = edgeType.getSrcIdField().getType();
} else {
if (!edgeType.getSrcIdField().getName().equals(chkSourceId)) {
throw new GeaFlowDSLException("SOURCE ID field name should be same between "
+ "edge tables");
}
if (!edgeType.getSrcIdField().getType().equals(chkSourceIdType)) {
throw new GeaFlowDSLException("SOURCE ID field type should be same between edge "
+ "tables");
Expand All @@ -192,10 +208,6 @@ public EdgeRecordType getEdgeType(Collection<String> edgeTypes, RelDataTypeFacto
chkDestinationId = edgeType.getTargetIdField().getName();
chkDestinationIdType = edgeType.getTargetIdField().getType();
} else {
if (!edgeType.getTargetIdField().getName().equals(chkDestinationId)) {
throw new GeaFlowDSLException("DESTINATION ID field name should be same "
+ "between edge tables");
}
if (!edgeType.getTargetIdField().getType().equals(chkDestinationIdType)) {
throw new GeaFlowDSLException("DESTINATION ID field type should be same "
+ "between edge tables");
Expand All @@ -211,10 +223,6 @@ public EdgeRecordType getEdgeType(Collection<String> edgeTypes, RelDataTypeFacto
chkTimestamp = edgeType.getTimestampField().get().getName();
chkTimestampType = edgeType.getTimestampField().get().getType();
} else {
if (!edgeType.getTimestampField().get().getName().equals(chkTimestamp)) {
throw new GeaFlowDSLException("TIMESTAMP field name should be same between "
+ "edge tables");
}
if (!edgeType.getTimestampField().get().getType().equals(chkTimestampType)) {
throw new GeaFlowDSLException("TIMESTAMP field type should be same between edge "
+ "tables");
Expand Down Expand Up @@ -255,6 +263,30 @@ public EdgeRecordType getEdgeType(Collection<String> edgeTypes, RelDataTypeFacto
return EdgeRecordType.createEdgeType(combineFields, srcIdField, targetField, tsField, typeFactory);
}

public static List<RelDataTypeField> renameMetaField(List<RelDataTypeField> fields,
MetaField metaType,
String newFieldName) {
List<RelDataTypeField> metaFields = new ArrayList<>();
return fields.stream().filter(f -> {
if (f.getType() instanceof MetaFieldType
&& ((MetaFieldType)f.getType()).getMetaField().equals(metaType)) {
if (metaFields.isEmpty()) {
metaFields.add(f);
return true;
}
return false;
}
return true;
}).map(f -> {
if (f.getType() instanceof MetaFieldType
&& ((MetaFieldType)f.getType()).getMetaField().equals(metaType)) {
return new RelDataTypeFieldImpl(newFieldName, f.getIndex(), f.getType());
} else {
return f;
}
}).collect(Collectors.toList());
}

/**
* Copy the graph type and add a vertex field to all the vertex tables.
* @param fieldName The added field name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public boolean isNullable() {

public static VertexRecordType createVertexType(List<RelDataTypeField> fields, String idField,
RelDataTypeFactory typeFactory) {
boolean hasMultiIdFields = fields.stream().filter(f -> f.getType() instanceof MetaFieldType
&& ((MetaFieldType) f.getType()).getMetaField().equals(MetaField.VERTEX_ID)).count() > 1;
if (hasMultiIdFields) {
idField = VertexType.DEFAULT_ID_FIELD_NAME;
fields = GraphRecordType.renameMetaField(fields, MetaField.VERTEX_ID, idField);
}
List<RelDataTypeField> reorderFields = reorderFields(fields, idField, typeFactory);
return new VertexRecordType(reorderFields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@
import com.antgroup.geaflow.dsl.udf.table.math.E;
import com.antgroup.geaflow.dsl.udf.table.math.Log2;
import com.antgroup.geaflow.dsl.udf.table.math.Round;
import com.antgroup.geaflow.dsl.udf.table.other.EdgeSrcId;
import com.antgroup.geaflow.dsl.udf.table.other.EdgeTargetId;
import com.antgroup.geaflow.dsl.udf.table.other.EdgeTimestamp;
import com.antgroup.geaflow.dsl.udf.table.other.If;
import com.antgroup.geaflow.dsl.udf.table.other.Label;
import com.antgroup.geaflow.dsl.udf.table.other.VertexId;
import com.antgroup.geaflow.dsl.udf.table.string.Ascii2String;
import com.antgroup.geaflow.dsl.udf.table.string.Base64Decode;
import com.antgroup.geaflow.dsl.udf.table.string.Base64Encode;
Expand Down Expand Up @@ -158,6 +162,10 @@ public class BuildInSqlFunctionTable extends ListSqlOperatorTable {
// udf.table.other
.add(GeaFlowFunction.of(If.class))
.add(GeaFlowFunction.of(Label.class))
.add(GeaFlowFunction.of(VertexId.class))
.add(GeaFlowFunction.of(EdgeSrcId.class))
.add(GeaFlowFunction.of(EdgeTargetId.class))
.add(GeaFlowFunction.of(EdgeTimestamp.class))
// UDGA
.add(GeaFlowFunction.of(SingleSourceShortestPath.class))
.add(GeaFlowFunction.of(PageRank.class))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.dsl.udf.table.other;

import com.antgroup.geaflow.dsl.common.data.RowEdge;
import com.antgroup.geaflow.dsl.common.function.Description;
import com.antgroup.geaflow.dsl.common.function.UDF;
import com.antgroup.geaflow.dsl.planner.GQLJavaTypeFactory;
import com.antgroup.geaflow.dsl.util.GraphSchemaUtil;
import org.apache.calcite.rel.type.RelDataType;

@Description(name = "srcId", description = "Returns srcId for edge")
public class EdgeSrcId extends UDF implements GraphMetaFieldAccessFunction {

public Object eval(RowEdge edge) {
return edge.getSrcId();
}

@Override
public RelDataType getReturnRelDataType(GQLJavaTypeFactory typeFactory) {
return GraphSchemaUtil.getCurrentGraphEdgeSrcIdType(typeFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.dsl.udf.table.other;

import com.antgroup.geaflow.dsl.common.data.RowEdge;
import com.antgroup.geaflow.dsl.common.function.Description;
import com.antgroup.geaflow.dsl.common.function.UDF;
import com.antgroup.geaflow.dsl.planner.GQLJavaTypeFactory;
import com.antgroup.geaflow.dsl.util.GraphSchemaUtil;
import org.apache.calcite.rel.type.RelDataType;

@Description(name = "targetId", description = "Returns targetId for edge")
public class EdgeTargetId extends UDF implements GraphMetaFieldAccessFunction {

public Object eval(RowEdge edge) {
return edge.getTargetId();
}

@Override
public RelDataType getReturnRelDataType(GQLJavaTypeFactory typeFactory) {
return GraphSchemaUtil.getCurrentGraphEdgeTargetIdType(typeFactory);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2023 AntGroup CO., Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.antgroup.geaflow.dsl.udf.table.other;

import com.antgroup.geaflow.dsl.common.exception.GeaFlowDSLException;
import com.antgroup.geaflow.dsl.common.function.Description;
import com.antgroup.geaflow.dsl.common.function.UDF;
import com.antgroup.geaflow.dsl.planner.GQLJavaTypeFactory;
import com.antgroup.geaflow.dsl.util.GraphSchemaUtil;
import com.antgroup.geaflow.model.graph.IGraphElementWithTimeField;
import org.apache.calcite.rel.type.RelDataType;

@Description(name = "ts", description = "Returns ts for edge with timestamp")
public class EdgeTimestamp extends UDF implements GraphMetaFieldAccessFunction {

public Long eval(IGraphElementWithTimeField edge) {
return edge.getTime();
}

@Override
public RelDataType getReturnRelDataType(GQLJavaTypeFactory typeFactory) {
if (GraphSchemaUtil.getCurrentGraphEdgeTimestampType(typeFactory).isPresent()) {
return GraphSchemaUtil.getCurrentGraphEdgeTimestampType(typeFactory).get();
} else {
throw new GeaFlowDSLException("Cannot find timestamp type");
}
}
}
Loading

0 comments on commit 68d227f

Please sign in to comment.