Skip to content

Commit

Permalink
[ISSUE-328] Support Pulsar Connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ramboloc authored Jan 18, 2024
1 parent e24abae commit a04a0ac
Show file tree
Hide file tree
Showing 16 changed files with 951 additions and 0 deletions.
134 changes: 134 additions & 0 deletions docs/docs-cn/application-development/dsl/connector/pulsar.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Pulsar Connector介绍
GeaFlow 支持从 Pulsar 中读取数据,并向 Pulsar 写入数据。目前支持的 Pulsar 版本为 2.8.1。
# 语法
```sql
CREATE TABLE pulsar_table (
id BIGINT,
name VARCHAR,
age INT
) WITH (
type='pulsar',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_read',
`geaflow.dsl.pulsar.subscriptionInitialPosition` = 'latest'
)
```
# 参数

| 参数名 | 是否必须 | 描述 |
| -------- | -------- | -------- |
| geaflow.dsl.pulsar.servers || Pulsar 的引导服务器(bootstrap)列表 |
| geaflow.dsl.pulsar.port || Pulsar 的引导服务器(bootstrap)端口号 |
| geaflow.dsl.pulsar.topic || Pulsar topic|
| geaflow.dsl.pulsar.subscriptionInitialPosition || Pulsar消费的初始位置,默认是 'latest', 可选择 'earliest‘|

注意:pulsar connector不能指定一个分区topic, 如果你要消费某个分区的消息,请选择分区topic的子topic名称。

# 示例1
示例1是从pulsar从`topic_read`中读取数据并且将数据写入`topic_write`中。
```sql
CREATE TABLE pulsar_source (
id BIGINT,
name VARCHAR,
age INT
) WITH (
type='pulsar',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_read',
`geaflow.dsl.pulsar.subscriptionInitialPosition` = 'latest'
);
CREATE TABLE pulsar_sink (
id BIGINT,
name VARCHAR,
age INT
) WITH (
type='pulsar',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_write'
);
INSERT INTO pulsar_sink
SELECT * FROM pulsar_source;
```
# 示例2
同样我们也可以进行四度环路检测。
```sql
set geaflow.dsl.window.size = 1;
set geaflow.dsl.ignore.exception = true;

CREATE GRAPH IF NOT EXISTS pulsar_modern (
Vertex person (
id bigint ID,
name varchar
),
Edge knows (
srcId bigint SOURCE ID,
targetId bigint DESTINATION ID,
weight double
)
) WITH (storeType='rocksdb',
shardCount = 1
);

CREATE TABLE IF NOT EXISTS pulsar_source (
text varchar
) WITH (
type='pulsar',
`geaflow.dsl.column.separator` = '#',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_read',
`geaflow.dsl.pulsar.subscriptionInitialPosition` = 'latest'
);

CREATE TABLE IF NOT EXISTS pulsar_sink (
a_id bigint,
b_id bigint,
c_id bigint,
d_id bigint,
a1_id bigint
) WITH (
type='pulsar',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_write'
);

USE GRAPH pulsar_modern;

INSERT INTO pulsar_modern.person(id, name)
SELECT
cast(trim(split_ex(t1, ',', 0)) as bigint),
split_ex(trim(t1), ',', 1)
FROM (
Select trim(substr(text, 2)) as t1
FROM pulsar_source
WHERE substr(text, 1, 1) = '.'
);

INSERT INTO pulsar_modern.knows
SELECT
cast(split_ex(t1, ',', 0) as bigint),
cast(split_ex(t1, ',', 1) as bigint),
cast(split_ex(t1, ',', 2) as double)
FROM (
Select trim(substr(text, 2)) as t1
FROM pulsar_source
WHERE substr(text, 1, 1) = '-'
);

INSERT INTO pulsar_sink
SELECT
a_id,
b_id,
c_id,
d_id,
a1_id
FROM (
MATCH (a:person) -[:knows]->(b:person) -[:knows]-> (c:person)
-[:knows]-> (d:person) -> (a:person)
RETURN a.id as a_id, b.id as b_id, c.id as c_id, d.id as d_id, a.id as a1_id
);
```
1 change: 1 addition & 0 deletions docs/docs-cn/application-development/dsl/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
* [Hive Connector](connector/hive.md)
* [File Connector](connector/file.md)
* [Kafka Connector](connector/kafka.md)
* [Pulsar Connector](connector/pulsar.md)
* [用户自定义Connector](connector/udc.md)


133 changes: 133 additions & 0 deletions docs/docs-en/application-development/dsl/connector/pulsar.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Pulsar Connector Introduction
GeaFlow supports reading data from Pulsar and writing data to Pulsar. The currently supported Pulsar version is 2.8.1.
# Syntax
```sql
CREATE TABLE pulsar_table (
id BIGINT,
name VARCHAR,
age INT
) WITH (
type='pulsar',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_read',
`geaflow.dsl.pulsar.subscriptionInitialPosition` = 'latest'
)
```
# Options

| Key | Required | Description |
| -------- | -------- | -------- |
| geaflow.dsl.pulsar.servers | yes | The pulsar bootstrap servers list. |
| geaflow.dsl.pulsar.port | yes | The port of pulsar bootstrap servers. |
| geaflow.dsl.pulsar.topic | yes | Pulsar topic|
| geaflow.dsl.pulsar.subscriptionInitialPosition | No | The initial position of consumer, default is 'latest'.|

Note: Pulsar connector cannot specify a partition topic. If you want to consume messages for a certain partition, please select the sub topic name of the partition topic.
# Example1
Example 1 is from pulsar to `topic_read` data and write it to the `topic_write`.
```sql
CREATE TABLE pulsar_source (
id BIGINT,
name VARCHAR,
age INT
) WITH (
type='pulsar',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_read',
`geaflow.dsl.pulsar.subscriptionInitialPosition` = 'latest'
);
CREATE TABLE pulsar_sink (
id BIGINT,
name VARCHAR,
age INT
) WITH (
type='pulsar',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_write'
);
INSERT INTO pulsar_sink
SELECT * FROM pulsar_source;
```
# Example2
Similarly, we can also perform a fourth degree loop detection.
```sql
set geaflow.dsl.window.size = 1;
set geaflow.dsl.ignore.exception = true;

CREATE GRAPH IF NOT EXISTS pulsar_modern (
Vertex person (
id bigint ID,
name varchar
),
Edge knows (
srcId bigint SOURCE ID,
targetId bigint DESTINATION ID,
weight double
)
) WITH (storeType='rocksdb',
shardCount = 1
);

CREATE TABLE IF NOT EXISTS pulsar_source (
text varchar
) WITH (
type='pulsar',
`geaflow.dsl.column.separator` = '#',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_read',
`geaflow.dsl.pulsar.subscriptionInitialPosition` = 'latest'
);

CREATE TABLE IF NOT EXISTS pulsar_sink (
a_id bigint,
b_id bigint,
c_id bigint,
d_id bigint,
a1_id bigint
) WITH (
type='pulsar',
`geaflow.dsl.pulsar.servers` = 'localhost',
`geaflow.dsl.pulsar.port` = '6650',
`geaflow.dsl.pulsar.topic` = 'persistent://test/test_pulsar_connector/topic_write'
);

USE GRAPH pulsar_modern;

INSERT INTO pulsar_modern.person(id, name)
SELECT
cast(trim(split_ex(t1, ',', 0)) as bigint),
split_ex(trim(t1), ',', 1)
FROM (
Select trim(substr(text, 2)) as t1
FROM pulsar_source
WHERE substr(text, 1, 1) = '.'
);

INSERT INTO pulsar_modern.knows
SELECT
cast(split_ex(t1, ',', 0) as bigint),
cast(split_ex(t1, ',', 1) as bigint),
cast(split_ex(t1, ',', 2) as double)
FROM (
Select trim(substr(text, 2)) as t1
FROM pulsar_source
WHERE substr(text, 1, 1) = '-'
);

INSERT INTO pulsar_sink
SELECT
a_id,
b_id,
c_id,
d_id,
a1_id
FROM (
MATCH (a:person) -[:knows]->(b:person) -[:knows]-> (c:person)
-[:knows]-> (d:person) -> (a:person)
RETURN a.id as a_id, b.id as b_id, c.id as c_id, d.id as d_id, a.id as a1_id
);
```
1 change: 1 addition & 0 deletions docs/docs-en/application-development/dsl/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,6 @@ Hybrid-DSL is a data analysis language provided by GeaFlow, which supports stand
* [Hive Connector](connector/hive.md)
* [File Connector](connector/file.md)
* [Kafka Connector](connector/kafka.md)
* [Pulsar Connector](connector/pulsar.md)
* [User Defined Connector](connector/udc.md)

Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>geaflow-dsl-connector</artifactId>
<groupId>com.antgroup.tugraph</groupId>
<version>0.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>geaflow-dsl-connector-pulsar</artifactId>

<properties>
<pulsar.version>2.8.1</pulsar.version>
<project.version>0.4-SNAPSHOT</project.version>
</properties>

<dependencies>
<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-dsl-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.antgroup.tugraph</groupId>
<artifactId>geaflow-dsl-connector-api</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>${pulsar.version}</version>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${testng.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.antgroup.geaflow.dsl.connector.pulsar;

import com.antgroup.geaflow.common.config.ConfigKey;
import com.antgroup.geaflow.common.config.ConfigKeys;

public class PulsarConfigKeys {

public static final ConfigKey GEAFLOW_DSL_PULSAR_SERVERS = ConfigKeys
.key("geaflow.dsl.pulsar.servers")
.noDefaultValue()
.description("The pulsar bootstrap servers list.");

public static final ConfigKey GEAFLOW_DSL_PULSAR_PORT = ConfigKeys
.key("geaflow.dsl.pulsar.port")
.noDefaultValue()
.description("The pulsar bootstrap servers list.");

public static final ConfigKey GEAFLOW_DSL_PULSAR_TOPIC = ConfigKeys
.key("geaflow.dsl.pulsar.topic")
.noDefaultValue()
.description("The pulsar topic.");

public static final ConfigKey GEAFLOW_DSL_PULSAR_SUBSCRIBE_NAME = ConfigKeys
.key("geaflow.dsl.pulsar.subscribeName")
.defaultValue("default-subscribeName")
.description("The pulsar subscribeName, default is 'default-subscribeName'.");

public static final ConfigKey GEAFLOW_DSL_PULSAR_SUBSCRIBE_INITIAL_POSITION = ConfigKeys
.key("geaflow.dsl.pulsar.subscriptionInitialPosition")
.defaultValue("latest")
.description("The pulsar subscriptionInitialPosition, default is 'default-subscriptionInitialPosition'.");


}
Loading

0 comments on commit a04a0ac

Please sign in to comment.