Skip to content

Commit

Permalink
Add exception handling mode (#12)
Browse files Browse the repository at this point in the history
* Add exception handling mode #11

- Changing naming of ClickHouse (converting letter "h" to uppercase);
- Add exception handling with CompletableFuture;
- ClickHouse sending exception handling config was added:
1. if `ignoring-clickhouse-sending-exception-enabled` is true, exception while clickhouse sending is ignored and failed data automatically goes to the disk.
2. if `ignoring-clickhouse-sending-exception-enabled` is false, clickhouse sending exception thrown in "main" thread (thread which called ClickhHouseSink::invoke) and data also goes to the disk.
  • Loading branch information
ashulenko authored Nov 23, 2020
1 parent 26ff5e5 commit fa10bbb
Show file tree
Hide file tree
Showing 26 changed files with 792 additions and 367 deletions.
45 changes: 25 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@

# Flink-Clickhouse-Sink
# Flink-ClickHouse-Sink

[![Build Status](https://travis-ci.com/ivi-ru/flink-clickhouse-sink.svg?branch=master)](https://travis-ci.com/ivi-ru/flink-clickhouse-sink)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/ru.ivi.opensource/flink-clickhouse-sink/badge.svg)](https://maven-badges.herokuapp.com/maven-central/ru.ivi.opensource/flink-clickhouse-sink/)

## Description

[Flink](https://github.com/apache/flink) sink for [Clickhouse](https://github.com/yandex/ClickHouse) database.
[Flink](https://github.com/apache/flink) sink for [ClickHouse](https://github.com/yandex/ClickHouse) database.
Powered by [Async Http Client](https://github.com/AsyncHttpClient/async-http-client).

High-performance library for loading data to Clickhouse.
High-performance library for loading data to ClickHouse.

It has two triggers for loading data:
_by timeout_ and _by buffer size_.
Expand Down Expand Up @@ -48,17 +48,21 @@ common and for each sink in you operators chain.

`clickhouse.sink.retries` - max number of retries,

`clickhouse.sink.failed-records-path`- path for failed records.
`clickhouse.sink.failed-records-path`- path for failed records,

`clickhouse.sink.ignoring-clickhouse-sending-exception-enabled` - required boolean parameter responsible for raising (false) or not (true) ClickHouse sending exception in main thread.
if `ignoring-clickhouse-sending-exception-enabled` is true, exception while clickhouse sending is ignored and failed data automatically goes to the disk.
if `ignoring-clickhouse-sending-exception-enabled` is false, clickhouse sending exception thrown in "main" thread (thread which called ClickhHouseSink::invoke) and data also goes to the disk.

**The sink part** (use in chain):

`clickhouse.sink.target-table` - target table in Clickhouse,
`clickhouse.sink.target-table` - target table in ClickHouse,

`clickhouse.sink.max-buffer-size`- buffer size.

### In code
The main thing: the clickhouse-sink works with events in string
(Clickhouse insert format, like CSV) format.
(ClickHouse insert format, like CSV) format.
You have to convert your event to csv format (like usual insert in database).

For example, you have event-pojo:
Expand Down Expand Up @@ -97,17 +101,18 @@ You have to add global parameters for Flink environment:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
Map<String, String> globalParameters = new HashMap<>();

// clickhouse cluster properties
globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_HOSTS, ...);
globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_USER, ...);
globalParameters.put(ClickhouseClusterSettings.CLICKHOUSE_PASSWORD, ...);
// ClickHouse cluster properties
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, ...);
globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, ...);

// sink common
globalParameters.put(ClickhouseSinkConsts.TIMEOUT_SEC, ...);
globalParameters.put(ClickhouseSinkConsts.FAILED_RECORDS_PATH, ...);
globalParameters.put(ClickhouseSinkConsts.NUM_WRITERS, ...);
globalParameters.put(ClickhouseSinkConsts.NUM_RETRIES, ...);
globalParameters.put(ClickhouseSinkConsts.QUEUE_MAX_CAPACITY, ...);
globalParameters.put(ClickHouseSinkConsts.TIMEOUT_SEC, ...);
globalParameters.put(ClickHouseSinkConsts.FAILED_RECORDS_PATH, ...);
globalParameters.put(ClickHouseSinkConsts.NUM_WRITERS, ...);
globalParameters.put(ClickHouseSinkConsts.NUM_RETRIES, ...);
globalParameters.put(ClickHouseSinkConsts.QUEUE_MAX_CAPACITY, ...);
globalParameters.put(ClickHouseSinkConsts.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, ...);

// set global paramaters
ParameterTool parameters = ParameterTool.fromMap(buildGlobalParameters(config));
Expand All @@ -128,15 +133,15 @@ public YourEventConverter {

// create props for sink
Properties props = new Properties();
props.put(ClickhouseSinkConsts.TARGET_TABLE_NAME, "your_table");
props.put(ClickhouseSinkConsts.MAX_BUFFER_SIZE, "10000");
props.put(ClickHouseSinkConsts.TARGET_TABLE_NAME, "your_table");
props.put(ClickHouseSinkConsts.MAX_BUFFER_SIZE, "10000");

// build chain
DataStream<YourEvent> dataStream = ...;
dataStream.map(YourEventConverter::toClickHouseInsertFormat)
.name("convert YourEvent to Clickhouse table format")
.addSink(new ClickhouseSink(props))
.name("your_table clickhouse sink);
.name("convert YourEvent to ClickHouse table format")
.addSink(new ClickHouseSink(props))
.name("your_table ClickHouse sink);
```
## Roadmap
Expand Down
20 changes: 16 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@

<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.1.0-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Flink Clickhouse sink</name>
<name>Flink ClickHouse sink</name>

<description>
Flink sink for Clickhouse database. Powered by Async Http Client.
High-performance library for loading data to Clickhouse.
Flink sink for ClickHouse database. Powered by Async Http Client.
High-performance library for loading data to ClickHouse.
</description>
<url>https://github.com/ivi-ru/flink-clickhouse-sink.git</url>

Expand Down Expand Up @@ -77,6 +77,18 @@
<id>aleksanchezz</id>
<name>Alexander Lanko</name>
</developer>
<developer>
<id>ashulenko</id>
<name>Andrey Shulenko</name>
</developer>
<developer>
<id>akozhabay</id>
<name>Alisher Kozhabay</name>
</developer>
<developer>
<id>sserdyukov</id>
<name>Stanislav Serdyukov</name>
</developer>
</developers>

<issueManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,25 @@
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.applied.ClickhouseSinkBuffer;
import ru.ivi.opensource.flinkclickhousesink.applied.ClickhouseSinkManager;
import ru.ivi.opensource.flinkclickhousesink.applied.ClickHouseSinkManager;
import ru.ivi.opensource.flinkclickhousesink.applied.Sink;

import java.util.Map;
import java.util.Properties;


public class ClickhouseSink extends RichSinkFunction<String> {
public class ClickHouseSink extends RichSinkFunction<String> {

private static final Logger logger = LoggerFactory.getLogger(ClickhouseSink.class);
private static final Logger logger = LoggerFactory.getLogger(ClickHouseSink.class);

private static final Object DUMMY_LOCK = new Object();

private final Properties localProperties;

private volatile static transient ClickhouseSinkManager sinkManager;
private transient ClickhouseSinkBuffer clickhouseSinkBuffer;
private volatile static transient ClickHouseSinkManager sinkManager;
private transient Sink sink;

public ClickhouseSink(Properties properties) {
public ClickHouseSink(Properties properties) {
this.localProperties = properties;
}

Expand All @@ -36,33 +36,33 @@ public void open(Configuration config) {
.getGlobalJobParameters()
.toMap();

sinkManager = new ClickhouseSinkManager(params);
sinkManager = new ClickHouseSinkManager(params);
}
}
}

clickhouseSinkBuffer = sinkManager.buildBuffer(localProperties);
sink = sinkManager.buildSink(localProperties);
}

/**
* Add csv to buffer
* Add csv to sink
*
* @param recordAsCSV csv-event
*/
@Override
public void invoke(String recordAsCSV, Context context) {
try {
clickhouseSinkBuffer.put(recordAsCSV);
sink.put(recordAsCSV);
} catch (Exception e) {
logger.error("Error while sending data to Clickhouse, record = {}", recordAsCSV, e);
logger.error("Error while sending data to ClickHouse, record = {}", recordAsCSV, e);
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
if (clickhouseSinkBuffer != null) {
clickhouseSinkBuffer.close();
if (sink != null) {
sink.close();
}

if (sinkManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,44 @@
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.model.ClickhouseRequestBlank;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseRequestBlank;
import ru.ivi.opensource.flinkclickhousesink.util.FutureUtil;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class ClickhouseSinkBuffer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ClickhouseSinkBuffer.class);
public class ClickHouseSinkBuffer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ClickHouseSinkBuffer.class);

private final ClickhouseWriter writer;
private final ClickHouseWriter writer;
private final String targetTable;
private final int maxFlushBufferSize;
private final long timeoutMillis;
private final List<String> localValues;
private final List<CompletableFuture<Boolean>> futures;

private volatile long lastAddTimeMillis = 0L;

private ClickhouseSinkBuffer(
ClickhouseWriter chWriter,
private ClickHouseSinkBuffer(
ClickHouseWriter chWriter,
long timeout,
int maxBuffer,
String table
String table,
List<CompletableFuture<Boolean>> futures
) {
writer = chWriter;
localValues = new ArrayList<>();
timeoutMillis = timeout;
maxFlushBufferSize = maxBuffer;
targetTable = table;

logger.info("Instance Clickhouse Sink, target table = {}, buffer size = {}", this.targetTable, this.maxFlushBufferSize);
this.futures = futures;

logger.info("Instance ClickHouse Sink, target table = {}, buffer size = {}", this.targetTable, this.maxFlushBufferSize);
}

String getTargetTable() {
Expand All @@ -54,7 +61,7 @@ synchronized void tryAddToQueue() {

private void addToQueue() {
List<String> deepCopy = buildDeepCopy(localValues);
ClickhouseRequestBlank params = ClickhouseRequestBlank.Builder
ClickHouseRequestBlank params = ClickHouseRequestBlank.Builder
.aBuilder()
.withValues(deepCopy)
.withTargetTable(targetTable)
Expand Down Expand Up @@ -87,22 +94,33 @@ private static List<String> buildDeepCopy(List<String> original) {
return Collections.unmodifiableList(new ArrayList<>(original));
}

public void assertFuturesNotFailedYet() throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = FutureUtil.allOf(futures);
//nonblocking operation
if (future.isCompletedExceptionally()) {
future.get();
}
}

@Override
public void close() {
logger.info("ClickHouse sink buffer is shutting down.");
if (localValues != null && localValues.size() > 0) {
addToQueue();
}
logger.info("ClickHouse sink buffer shutdown complete.");
}

public static final class Builder {
private String targetTable;
private int maxFlushBufferSize;
private int timeoutSec;
private List<CompletableFuture<Boolean>> futures;

private Builder() {
}

public static Builder aClickhouseSinkBuffer() {
public static Builder aClickHouseSinkBuffer() {
return new Builder();
}

Expand All @@ -121,17 +139,23 @@ public Builder withTimeoutSec(int timeoutSec) {
return this;
}

public ClickhouseSinkBuffer build(ClickhouseWriter writer) {
public Builder withFutures(List<CompletableFuture<Boolean>> futures) {
this.futures = futures;
return this;
}

public ClickHouseSinkBuffer build(ClickHouseWriter writer) {

Preconditions.checkNotNull(targetTable);
Preconditions.checkArgument(maxFlushBufferSize > 0);
Preconditions.checkArgument(timeoutSec > 0);

return new ClickhouseSinkBuffer(
return new ClickHouseSinkBuffer(
writer,
TimeUnit.SECONDS.toMillis(this.timeoutSec),
this.maxFlushBufferSize,
this.targetTable
this.targetTable,
this.futures
);
}
}
Expand Down
Loading

0 comments on commit fa10bbb

Please sign in to comment.