Skip to content

Commit

Permalink
support to set version white list (#92)
Browse files Browse the repository at this point in the history
* support to set version white list

* update the code style

* fix test

* fix test
  • Loading branch information
Nicole00 authored Dec 28, 2023
1 parent 24a7514 commit b6de2c4
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ public class NebulaClientOptions implements Serializable {

private final SelfSignParams selfSignParams;

private final String version;

private NebulaClientOptions(String metaAddress, String graphAddress, String username,
String password, int timeout, int connectRetry,
boolean enableGraphSSL, boolean enableMetaSSL,
boolean enableStorageSSL,
SSLSignType sslSignType, CASignParams caSignParams,
SelfSignParams selfSignParams) {
SelfSignParams selfSignParams, String version) {
this.metaAddress = metaAddress;
this.graphAddress = graphAddress;
this.username = username;
Expand All @@ -59,6 +60,7 @@ private NebulaClientOptions(String metaAddress, String graphAddress, String user
this.sslSignType = sslSignType;
this.caSignParams = caSignParams;
this.selfSignParams = selfSignParams;
this.version = version;
}

public List<HostAddress> getMetaAddress() {
Expand Down Expand Up @@ -118,6 +120,10 @@ public SelfSignParams getSelfSignParam() {
return selfSignParams;
}

public String getVersion() {
return version;
}

/**
* Builder for {@link NebulaClientOptions}
*/
Expand All @@ -136,6 +142,7 @@ public static class NebulaClientOptionsBuilder {
private SSLSignType sslSignType = null;
private CASignParams caSignParams = null;
private SelfSignParams selfSignParams = null;
private String version = null;

public NebulaClientOptionsBuilder setMetaAddress(String metaAddress) {
this.metaAddress = metaAddress;
Expand Down Expand Up @@ -200,6 +207,11 @@ public NebulaClientOptionsBuilder setSelfSignParam(String crtFilePath, String ke
return this;
}

public NebulaClientOptionsBuilder setVersion(String version) {
this.version = version;
return this;
}

public NebulaClientOptions build() {
if (metaAddress == null || metaAddress.trim().isEmpty()) {
throw new IllegalArgumentException("meta address can not be empty.");
Expand Down Expand Up @@ -246,7 +258,8 @@ public NebulaClientOptions build() {
enableStorageSSL,
sslSignType,
caSignParams,
selfSignParams);
selfSignParams,
version);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.apache.flink.connector.nebula.connection;


import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.CASignedSSLParam;
import com.vesoft.nebula.client.graph.data.HostAddress;
Expand Down Expand Up @@ -43,9 +42,9 @@ public NebulaPool getNebulaPool() throws UnknownHostException {
}

Collections.shuffle(addresses);
NebulaPool nebulaPool = new NebulaPool();
NebulaPoolConfig poolConfig = new NebulaPoolConfig();
poolConfig.setTimeout(nebulaClientOptions.getTimeout());
poolConfig.setVersion(nebulaClientOptions.getVersion());
if (nebulaClientOptions.isEnableGraphSSL()) {
poolConfig.setEnableSsl(true);
switch (nebulaClientOptions.getSSLSignType()) {
Expand All @@ -67,8 +66,12 @@ public NebulaPool getNebulaPool() throws UnknownHostException {
throw new IllegalArgumentException("ssl sign type is not supported.");
}
}
nebulaPool.init(addresses, poolConfig);
return nebulaPool;
NebulaPool nebulaPool = new NebulaPool();
if (nebulaPool.init(addresses, poolConfig)) {
return nebulaPool;
} else {
throw new RuntimeException("NebulaPool init failed.");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public MetaClient getMetaClient() throws TException, ClientServerIncompatibleExc
metaClient = new MetaClient(addresses, timeout, retry, retry);
}

metaClient.setVersion(nebulaClientOptions.getVersion());
metaClient.connect();
return metaClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public StorageClient getStorageClient() throws Exception {
storageClient = new StorageClient(addresses, timeout);
}

storageClient.setVersion(nebulaClientOptions.getVersion());
if (!storageClient.connect()) {
throw new Exception("failed to connect storaged.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void getNebulaPool() {
.setPassword("nebula")
.setConnectRetry(1)
.setTimeout(1000)
.setVersion("test")
.build();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
Expand All @@ -52,10 +53,37 @@ public void getNebulaPool() {
}
}

@Test
public void getNebulaPoolWithWrongVersion() {
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
.setGraphAddress("127.0.0.1:9669")
.setMetaAddress("127.0.0.1:9559")
.setUsername("root")
.setPassword("nebula")
.setConnectRetry(1)
.setTimeout(1000)
.setVersion("INVALID_VERSION")
.build();
NebulaGraphConnectionProvider graphConnectionProvider =
new NebulaGraphConnectionProvider(nebulaClientOptions);
try {
NebulaPool nebulaPool = graphConnectionProvider.getNebulaPool();
nebulaPool.getSession("root", "nebula", true);
} catch (Exception e) {
LOG.info("get session failed", e);
if (e.getMessage().contains("NebulaPool init failed.")) {
assert true;
} else {
assert false;
}
}
}

/**
* nebula server does not enable ssl, the connection cannot be established correctly.
*/
@Test(expected = NotValidConnectionException.class)
@Test(expected = RuntimeException.class)
public void getSessionWithSsl() throws NotValidConnectionException {
NebulaClientOptions nebulaClientOptions =
new NebulaClientOptions.NebulaClientOptionsBuilder()
Expand All @@ -81,7 +109,7 @@ public void getSessionWithSsl() throws NotValidConnectionException {
NebulaPool pool = graphConnectionProvider.getNebulaPool();
pool.getSession("root", "nebula", true);
} catch (UnknownHostException | IOErrorException | AuthFailedException
| ClientServerIncompatibleException e) {
| ClientServerIncompatibleException e) {
LOG.error("get session failed", e);
assert (false);
}
Expand Down
12 changes: 12 additions & 0 deletions connector/src/test/resources/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --expired_time_factor=2
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
healthcheck:
test: ["CMD", "curl", "-f", "http://172.28.1.1:11000/status"]
interval: 30s
Expand Down Expand Up @@ -52,6 +54,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --expired_time_factor=2
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
healthcheck:
test: ["CMD", "curl", "-f", "http://172.28.1.2:11000/status"]
interval: 30s
Expand Down Expand Up @@ -88,6 +92,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --expired_time_factor=2
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
healthcheck:
test: ["CMD", "curl", "-f", "http://172.28.1.3:11000/status"]
interval: 30s
Expand Down Expand Up @@ -242,6 +248,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --timezone_name=+08:00:00
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
depends_on:
- metad0
- metad1
Expand Down Expand Up @@ -279,6 +287,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --timezone_name=+08:00:00
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
depends_on:
- metad0
- metad1
Expand Down Expand Up @@ -316,6 +326,8 @@ services:
- --minloglevel=0
- --heartbeat_interval_secs=2
- --timezone_name=+08:00:00
- --enable_client_white_list=true
- --client_white_list=3.0.0:test
depends_on:
- metad0
- metad1
Expand Down

0 comments on commit b6de2c4

Please sign in to comment.