Skip to content

Commit

Permalink
fix deserialize bug for nebula NULL data (#74)
Browse files Browse the repository at this point in the history
* fix deserialize bug for nebula NULL data

* unified naming

* change num for test

* fix checkstyle

* update edge int8

* remove useless func
  • Loading branch information
liuxiaocs7 authored Sep 28, 2022
1 parent 6bb4a2e commit 5243fb7
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public RowData convert(BaseTableRow record) throws UnsupportedEncodingException
GenericRowData genericRowData = new GenericRowData(rowType.getFieldCount());
for (int pos = 0; pos < rowType.getFieldCount(); pos++) {
ValueWrapper valueWrapper = values.get(pos);
if (valueWrapper != null) {
if (!valueWrapper.isNull()) {
try {
genericRowData.setField(pos,
toInternalConverters[pos].deserialize(valueWrapper));
Expand Down Expand Up @@ -98,7 +98,7 @@ interface NebulaDeserializationConverter extends Serializable {
* Convert a Nebula DataStructure of {@link BaseTableRow}
* to the internal data structure object.
*/
Object deserialize(ValueWrapper baseTableRow)
Object deserialize(ValueWrapper valueWrapper)
throws SQLException, UnsupportedEncodingException;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ public class AbstractNebulaInputFormatITTest {
private static final String PASSWORD = "nebula";

private static final String[] stats = new String[]{
"CREATE SPACE IF NOT EXISTS `flinkSink` (partition_num = 100, charset = utf8,"
"CREATE SPACE IF NOT EXISTS `flinkSinkInput` (partition_num = 100, charset = utf8,"
+ " replica_factor = 3, collate = utf8_bin, vid_type = INT64);"
+ "USE `flinkSink`;",
+ "USE `flinkSinkInput`;",
"CREATE TAG IF NOT EXISTS person (col1 string, col2 fixed_string(8), col3 int8,"
+ " col4 int16, col5 int32,"
+ " col6 int64, col7 date, col8 datetime, col9 timestamp, col10 bool,"
Expand All @@ -61,8 +61,8 @@ public class AbstractNebulaInputFormatITTest {
* construct flink vertex data
*/
private static List<List<String>> constructVertexSourceData() {
List<List<String>> players = new ArrayList<>();
List<String> fields1 = Arrays.asList("61", "\"aba\"", "\"abcdefgh\"", "1", "1111", "22222",
List<List<String>> persons = new ArrayList<>();
List<String> fields1 = Arrays.asList("61", "\"aba\"", "\"abcdefgh\"", "22", "1111", "22222",
"6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")",
"435463424", "false", "1.2", "1.0", "time(\"11:12:12\")",
"ST_GeogFromText(\"POINT(1 3)\")");
Expand Down Expand Up @@ -94,23 +94,23 @@ private static List<List<String>> constructVertexSourceData() {
"6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")",
"435463424", "true", "1.2", "1.0", "time(\"11:12:12\")",
"ST_GeogFromText(\"POLYGON((0 1,1 2,2 3,0 1))\")");
players.add(fields1);
players.add(fields2);
players.add(fields3);
players.add(fields4);
players.add(fields5);
players.add(fields6);
players.add(fields7);
players.add(fields8);
return players;
persons.add(fields1);
persons.add(fields2);
persons.add(fields3);
persons.add(fields4);
persons.add(fields5);
persons.add(fields6);
persons.add(fields7);
persons.add(fields8);
return persons;
}

/**
* construct flink edge data
*/
private static List<List<String>> constructEdgeSourceData() {
List<List<String>> friends = new ArrayList<>();
List<String> fields1 = Arrays.asList("61", "62", "\"aba\"", "\"abcdefgh\"", "1", "1111",
List<String> fields1 = Arrays.asList("61", "62", "\"aba\"", "\"abcdefgh\"", "22", "1111",
"22222", "6412233", "date(\"2019-01-01\")", "datetime(\"2019-01-01T12:12:12\")",
"435463424", "false", "1.2", "1.0", "time(\"11:12:12\")",
"ST_GeogFromText(\"POINT(1 3)\")");
Expand Down Expand Up @@ -211,7 +211,7 @@ public void testVertexSource() throws ExecutionException, InterruptedException {
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'data-type' = 'vertex',"
+ " 'graph-space' = 'flinkSink'"
+ " 'graph-space' = 'flinkSinkInput'"
+ ")";
tableEnv.executeSql(creatSourceDDL);

Expand Down Expand Up @@ -271,7 +271,7 @@ public void testEdgeSource() throws ExecutionException, InterruptedException {
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'graph-space' = 'flinkSink',"
+ " 'graph-space' = 'flinkSinkInput',"
+ " 'data-type'='edge',"
+ " 'src-id-index'='0',"
+ " 'dst-id-index'='1',"
Expand Down Expand Up @@ -312,11 +312,11 @@ private void insertData(Session session) throws IOErrorException {
}

private String getVertexInsertStat() {
List<List<String>> players = constructVertexSourceData();
List<List<String>> persons = constructVertexSourceData();
List<NebulaVertex> vertices = new ArrayList<>();
for (List<String> player : players) {
for (List<String> person : persons) {
vertices.add(new NebulaVertex(
player.get(0), player.subList(1, player.size())));
person.get(0), person.subList(1, person.size())));
}
NebulaVertices nebulaVertices = new NebulaVertices(
"person",
Expand Down

0 comments on commit 5243fb7

Please sign in to comment.