Skip to content

Commit

Permalink
refactor table name design (#75)
Browse files Browse the repository at this point in the history
* add table name

* update ip

* use label-name
  • Loading branch information
liuxiaocs7 authored Sep 29, 2022
1 parent 5243fb7 commit a41a660
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con
List<String> fields = new ArrayList<>();
List<Integer> positions = new ArrayList<>();
List<Column> columns = context.getCatalogTable().getResolvedSchema().getColumns();
String labelName = config.get(LABEL_NAME);

if (config.get(DATA_TYPE).isVertex()) {
for (int i = 1; i < columns.size(); i++) {
Expand All @@ -161,7 +162,7 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con
.setIdIndex(0)
.setPositions(positions)
.setGraphSpace(config.get(GRAPH_SPACE))
.setTag(context.getObjectIdentifier().getObjectName())
.setTag(labelName)
.builder();
} else {
for (int i = 2; i < columns.size(); i++) {
Expand All @@ -178,7 +179,7 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con
.setRankIndex(config.get(RANK_ID_INDEX))
.setPositions(positions)
.setGraphSpace(config.get(GRAPH_SPACE))
.setEdge(context.getObjectIdentifier().getObjectName())
.setEdge(labelName)
.builder();
}
}
Expand All @@ -202,6 +203,7 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
Set<ConfigOption<?>> set = new HashSet<>();
set.add(GRAPH_SPACE);
set.add(LABEL_NAME);
set.add(DATA_TYPE);
set.add(TIMEOUT);
set.add(SRC_ID_INDEX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,36 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableEnvironment;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbstractNebulaOutPutFormatITTest {
static final String META_ADDRESS = "127.0.0.1:9559";
static final String GRAPH_ADDRESS = "127.0.0.1:9669";
static final String USER_NAME = "root";
static final String PASSWORD = "nebula";

private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractNebulaOutPutFormatITTest.class);

static final String STATIC_IP = "127.0.0.1";
static final String META_ADDRESS = STATIC_IP + NebulaConstant.COLON + "9559";
static final String GRAPH_ADDRESS = STATIC_IP + NebulaConstant.COLON + "9669";
static final String USER_NAME = "root";
static final String PASSWORD = "nebula";

@Before
public void mockData() {
NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
List<HostAddress> addresses = Arrays.asList(new HostAddress("127.0.0.1", 9669));
List<HostAddress> addresses = Arrays.asList(new HostAddress(STATIC_IP, 9669));
NebulaPool pool = new NebulaPool();
Session session = null;
try {
pool.init(addresses, nebulaPoolConfig);
session = pool.getSession("root", "nebula", true);
session = pool.getSession(USER_NAME, PASSWORD, true);

ResultSet respFlinkSinkSpace = session.execute(createFlinkSinkSpace());
ResultSet respTagPerson = session.execute(createTagPerson());
Expand Down Expand Up @@ -115,8 +121,11 @@ public void sinkVertexData() throws ExecutionException, InterruptedException {
TableEnvironment tableEnvironment =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());

Configuration configuration = tableEnvironment.getConfig().getConfiguration();
configuration.setString("table.dml-sync", "true");

tableEnvironment.executeSql(
"CREATE TABLE person ("
"CREATE TABLE personTable ("
+ "vid STRING,"
+ "col1 STRING,"
+ "col2 STRING,"
Expand Down Expand Up @@ -148,15 +157,28 @@ public void sinkVertexData() throws ExecutionException, InterruptedException {
+ PASSWORD
+ "',"
+ "'graph-space'='flinkSink',"
+ "'label-name'='person',"
+ "'data-type'='vertex'"
+ ")");

tableEnvironment
.executeSql(
"insert into person values ('89', 'aba', 'abcdefgh', '1', '1111',"
+ " '22222', '6412233', '2019-01-01', '2019-01-01T12:12:12',"
+ " '435463424', 'false', '1.2', '1.0', '11:12:12', 'POINT(1 3)')")
.await();
StatementSet stmtSet = tableEnvironment.createStatementSet();
stmtSet.addInsertSql(
"insert into personTable values ('61', 'aba', 'abcdefgh', '61', '1111',"
+ " '22222', '6412233', '2019-01-01', '2019-01-01T12:12:12',"
+ " '435463424', 'false', '1.2', '1.0', '11:12:12', 'POINT(1 3)')"
);
stmtSet.addInsertSql(
"insert into personTable values ('62', 'aba', 'abcdefgh', '62', '1111',"
+ " '22222', '6412233', '2019-01-01', '2019-01-01T12:12:12',"
+ " '435463424', 'false', '1.2', '1.0', '11:12:12', 'POINT(1 3)')"
);
stmtSet.addInsertSql(
"insert into personTable values ('89', 'aba', 'abcdefgh', '1', '1111',"
+ " '22222', '6412233', '2019-01-01', '2019-01-01T12:12:12',"
+ " '435463424', 'false', '1.2', '1.0', '11:12:12', 'POINT(1 3)')"
);

stmtSet.execute().await();
}

/** sink Nebula Graph Edge Data with default INSERT mode */
Expand All @@ -166,7 +188,7 @@ public void sinkEdgeData() throws ExecutionException, InterruptedException {
TableEnvironment.create(EnvironmentSettings.inStreamingMode());

tableEnvironment.executeSql(
"CREATE TABLE friend ("
"CREATE TABLE friendTable ("
+ "src STRING,"
+ "dst STRING,"
+ "col1 STRING,"
Expand Down Expand Up @@ -199,6 +221,7 @@ public void sinkEdgeData() throws ExecutionException, InterruptedException {
+ PASSWORD
+ "',"
+ "'graph-space'='flinkSink',"
+ "'label-name'='friend',"
+ "'src-id-index'='0',"
+ "'dst-id-index'='1',"
+ "'rank-id-index'='4',"
Expand All @@ -207,21 +230,21 @@ public void sinkEdgeData() throws ExecutionException, InterruptedException {

tableEnvironment
.executeSql(
"insert into friend values ('61', '62', 'aba', 'abcdefgh',"
"insert into friendTable values ('61', '62', 'aba', 'abcdefgh',"
+ " '1', '1111', '22222', '6412233', '2019-01-01',"
+ " '2019-01-01T12:12:12',"
+ " '435463424', 'false', '1.2', '1.0', '11:12:12', 'POINT(1 3)')")
.await();
}

/** sink Nebula Graph Edge Data with default INSERT mode */
/** sink Nebula Graph Edge Data without rank with default INSERT mode */
@Test
public void sinkEdgeDataWithoutRank() throws ExecutionException, InterruptedException {
TableEnvironment tableEnvironment =
TableEnvironment.create(EnvironmentSettings.inStreamingMode());

tableEnvironment.executeSql(
"CREATE TABLE friend ("
"CREATE TABLE friendTableWithoutRank ("
+ "src STRING,"
+ "dst STRING,"
+ "col1 STRING,"
Expand Down Expand Up @@ -254,14 +277,15 @@ public void sinkEdgeDataWithoutRank() throws ExecutionException, InterruptedExce
+ PASSWORD
+ "',"
+ "'graph-space'='flinkSink',"
+ "'label-name'='friend',"
+ "'src-id-index'='0',"
+ "'dst-id-index'='1',"
+ "'data-type'='edge'"
+ ")");

tableEnvironment
.executeSql(
"insert into friend values ('61', '89', 'aba', 'abcdefgh',"
"insert into friendTableWithoutRank values ('61', '89', 'aba', 'abcdefgh',"
+ " '1', '1111', '22222', '6412233', '2019-01-01',"
+ " '2019-01-01T12:12:12', '435463424', 'false', '1.2', '1.0',"
+ " '11:12:12', 'POINT(1 3)')")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ public class AbstractNebulaInputFormatITTest {

private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractNebulaInputFormatITTest.class);
private static final String META_ADDRESS = "127.0.0.1:9559";
private static final String GRAPH_ADDRESS = "127.0.0.1:9669";
private static final String STATIC_IP = "127.0.0.1";
private static final String META_ADDRESS = STATIC_IP + NebulaConstant.COLON + "9559";
private static final String GRAPH_ADDRESS = STATIC_IP + NebulaConstant.COLON + "9669";
private static final String USERNAME = "root";
private static final String PASSWORD = "nebula";

Expand Down Expand Up @@ -188,7 +189,7 @@ public void testVertexSource() throws ExecutionException, InterruptedException {
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String creatSourceDDL = "CREATE TABLE `person` ("
String creatSourceDDL = "CREATE TABLE `personTable` ("
+ " vid BIGINT,"
+ " col1 STRING,"
+ " col2 STRING,"
Expand All @@ -206,16 +207,17 @@ public void testVertexSource() throws ExecutionException, InterruptedException {
+ " col14 STRING"
+ ") WITH ("
+ " 'connector' = 'nebula',"
+ " 'meta-address' = '127.0.0.1:9559',"
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'meta-address' = '" + META_ADDRESS + "',"
+ " 'graph-address' = '" + GRAPH_ADDRESS + "',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'data-type' = 'vertex',"
+ " 'graph-space' = 'flinkSinkInput'"
+ " 'graph-space' = 'flinkSinkInput',"
+ " 'label-name' = 'person'"
+ ")";
tableEnv.executeSql(creatSourceDDL);

String creatSinkDDL = "CREATE TABLE `personSink` ("
String creatSinkDDL = "CREATE TABLE `personSinkTable` ("
+ " vid BIGINT,"
+ " col1 STRING,"
+ " col2 STRING,"
Expand All @@ -236,8 +238,8 @@ public void testVertexSource() throws ExecutionException, InterruptedException {
+ ")";
tableEnv.executeSql(creatSinkDDL);

Table table = tableEnv.sqlQuery("SELECT * FROM `person`");
table.executeInsert("`personSink`").await();
Table table = tableEnv.sqlQuery("SELECT * FROM `personTable`");
table.executeInsert("`personSinkTable`").await();
}

@Test
Expand All @@ -247,7 +249,7 @@ public void testEdgeSource() throws ExecutionException, InterruptedException {
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String creatSourceDDL = "CREATE TABLE `friend` ("
String creatSourceDDL = "CREATE TABLE `friendTable` ("
+ " sid BIGINT,"
+ " did BIGINT,"
+ " rid BIGINT,"
Expand All @@ -267,19 +269,20 @@ public void testEdgeSource() throws ExecutionException, InterruptedException {
+ " col14 STRING"
+ ") WITH ("
+ " 'connector' = 'nebula',"
+ " 'meta-address' = '127.0.0.1:9559',"
+ " 'graph-address' = '127.0.0.1:9669',"
+ " 'meta-address' = '" + META_ADDRESS + "',"
+ " 'graph-address' = '" + GRAPH_ADDRESS + "',"
+ " 'username' = 'root',"
+ " 'password' = 'nebula',"
+ " 'graph-space' = 'flinkSinkInput',"
+ " 'label-name' = 'friend',"
+ " 'data-type'='edge',"
+ " 'src-id-index'='0',"
+ " 'dst-id-index'='1',"
+ " 'rank-id-index'='2'"
+ ")";
tableEnv.executeSql(creatSourceDDL);

String creatSinkDDL = "CREATE TABLE `friendSink` ("
String creatSinkDDL = "CREATE TABLE `friendSinkTable` ("
+ " sid BIGINT,"
+ " did BIGINT,"
+ " rid BIGINT,"
Expand All @@ -302,8 +305,8 @@ public void testEdgeSource() throws ExecutionException, InterruptedException {
+ ")";
tableEnv.executeSql(creatSinkDDL);

Table table = tableEnv.sqlQuery("SELECT * FROM `friend`");
table.executeInsert("`friendSink`").await();
Table table = tableEnv.sqlQuery("SELECT * FROM `friendTable`");
table.executeInsert("`friendSinkTable`").await();
}

private void insertData(Session session) throws IOErrorException {
Expand Down

0 comments on commit a41a660

Please sign in to comment.