Skip to content

Commit

Permalink
converter interface (#22)
Browse files Browse the repository at this point in the history
* converter interface

* test has been added

* clean up

* test fix

* cleanup
  • Loading branch information
mchernyakov authored May 8, 2023
1 parent c91d527 commit 489c9ee
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 133 deletions.
107 changes: 61 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ _by timeout_ and _by buffer size_.
|:-------:|:---------------------:|
|1.3.* | 1.0.0 |
|1.9.* | 1.3.4 |

|1.9.* | 1.4.* |

### Install

Expand All @@ -29,7 +29,7 @@ _by timeout_ and _by buffer size_.
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.3.4</version>
<version>1.4.0</version>
</dependency>
```

Expand Down Expand Up @@ -61,40 +61,8 @@ common and for each sink in you operators chain.
`clickhouse.sink.max-buffer-size`- buffer size.

### In code
The main thing: the clickhouse-sink works with events in string
(ClickHouse insert format, like CSV) format.
You have to convert your event to csv format (like usual insert in database).

For example, you have event-pojo:
```java
class A {
public final String str;
public final int integer;

public A(String str, int i){
this.str = str;
this.integer = i;
}
}
```
You have to convert this pojo like this:
```java
public static String convertToCsv(A a) {
StringBuilder builder = new StringBuilder();
builder.append("(");

// add a.str
builder.append("'");
builder.append(a.str);
builder.append("', ");

// add a.intger
builder.append(String.valueOf(a.integer));
builder.append(" )");
return builder.toString();
}
```
And then add record to sink.
#### Configuration: global parameters

You have to add global parameters for Flink environment:
```java
Expand All @@ -120,27 +88,74 @@ environment.getConfig().setGlobalJobParameters(parameters);

```

And add your sink like this:
#### Converter

The main thing: the clickhouse-sink works with events in string
(ClickHouse insert format, like CSV) format.
You have to convert your event to csv format (like usual insert in database).

For example, you have event-pojo:
```java
class A {
public final String str;
public final int integer;

public A(String str, int i){
this.str = str;
this.integer = i;
}
}
```
You have to implement a converter to csv, using
```java
// create converter
public YourEventConverter {
String toClickHouseInsertFormat (YourEvent yourEvent){
String chFormat = ...;
....
return chFormat;

public interface ClickHouseSinkConverter<T> {
...
}

Example:

```
You have to convert this pojo like this:

```java
import ru.ivi.opensource.flinkclickhousesink.ClickHouseSinkConverter;

public class YourEventConverter implements ClickHouseSinkConverter<A>{

@Override
public String convert(A record){
StringBuilder builder = new StringBuilder();
builder.append("(");

// add a.str
builder.append("'");
builder.append(a.str);
builder.append("', ");

// add a.integer
builder.append(String.valueOf(a.integer));
builder.append(" )");
return builder.toString();
}
}
```
And then add record to sink.

And add your sink like this:
```java

// create props for sink
// create table props for sink
Properties props = new Properties();
props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "your_table");
props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000");

// converter
YourEventConverter converter = new YourEventConverter();

// build chain
DataStream<YourEvent> dataStream = ...;
dataStream.map(YourEventConverter::toClickHouseInsertFormat)
.name("convert YourEvent to ClickHouse table format")
.addSink(new ClickHouseSink(props))
dataStream.addSink(new ClickHouseSink(props, converter))
.name("your_table ClickHouse sink);
```
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.3.5-SNAPSHOT</version>
<version>1.4.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Flink ClickHouse sink</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@
import java.util.Map;
import java.util.Properties;


public class ClickHouseSink extends RichSinkFunction<String> {
public class ClickHouseSink<T> extends RichSinkFunction<T> {

private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class);

private static final Object DUMMY_LOCK = new Object();

private final Properties localProperties;
private final ClickHouseSinkConverter<T> clickHouseSinkConverter;

private volatile static transient ClickHouseSinkManager sinkManager;
private transient Sink sink;

public ClickHouseSink(Properties properties) {
public ClickHouseSink(Properties properties,
ClickHouseSinkConverter<T> clickHouseSinkConverter) {
this.localProperties = properties;
this.clickHouseSinkConverter = clickHouseSinkConverter;
}

@Override
Expand All @@ -45,16 +47,18 @@ public void open(Configuration config) {
}

/**
* Add csv to sink
* Add a record to sink
*
* @param recordAsCSV csv-event
* @param record record, which will be converted to csv, using {@link ClickHouseSinkConverter}
* @param context ctx
*/
@Override
public void invoke(String recordAsCSV, Context context) {
public void invoke(T record, Context context) {
try {
String recordAsCSV = clickHouseSinkConverter.convert(record);
sink.put(recordAsCSV);
} catch (Exception e) {
logger.error("Error while sending data to ClickHouse, record = {}", recordAsCSV, e);
logger.error("Error while sending data to ClickHouse, record = {}", record, e);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ru.ivi.opensource.flinkclickhousesink;

import java.io.Serializable;

@FunctionalInterface
public interface ClickHouseSinkConverter<T> extends Serializable {
String convert(T record);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ public class ClickHouseSinkManager implements AutoCloseable {
private final ClickHouseSinkScheduledCheckerAndCleaner clickHouseSinkScheduledCheckerAndCleaner;
private final ClickHouseSinkCommonParams sinkParams;
private final List<CompletableFuture<Boolean>> futures = Collections.synchronizedList(new LinkedList<>());

private volatile boolean isClosed = false;

public ClickHouseSinkManager(Map<String, String> globalParams) {
sinkParams = new ClickHouseSinkCommonParams(globalParams);
clickHouseWriter = new ClickHouseWriter(sinkParams, futures);
clickHouseSinkScheduledCheckerAndCleaner = new ClickHouseSinkScheduledCheckerAndCleaner(sinkParams, futures);
logger.info("Build sink writer's manager. params = {}", sinkParams.toString());
logger.info("Build sink writer's manager. params = {}", sinkParams);
}

public Sink buildSink(Properties localProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

public class ClickHouseSinkScheduledCheckerAndCleaner implements AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

import com.google.common.collect.Lists;
import io.netty.handler.codec.http.HttpHeaderNames;
import org.asynchttpclient.*;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseRequestBlank;
Expand All @@ -16,20 +21,28 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

public class ClickHouseWriter implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ClickHouseWriter.class);
private ExecutorService service;
private ExecutorService callbackService;
private List<WriterTask> tasks;

private final BlockingQueue<ClickHouseRequestBlank> commonQueue;
private final AtomicLong unprocessedRequestsCounter = new AtomicLong();
private final AsyncHttpClient asyncHttpClient;
private final List<CompletableFuture<Boolean>> futures;
private final ClickHouseSinkCommonParams sinkParams;

private ExecutorService service;
private ExecutorService callbackService;
private List<WriterTask> tasks;

public ClickHouseWriter(ClickHouseSinkCommonParams sinkParams, List<CompletableFuture<Boolean>> futures) {
this(sinkParams, futures, Dsl.asyncHttpClient());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

import java.util.Map;

import static ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst.*;
import static ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst.FAILED_RECORDS_PATH;
import static ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED;
import static ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst.NUM_RETRIES;
import static ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst.NUM_WRITERS;
import static ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst.QUEUE_MAX_CAPACITY;
import static ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst.TIMEOUT_SEC;

public class ClickHouseSinkCommonParams {

Expand Down
Loading

0 comments on commit 489c9ee

Please sign in to comment.