Skip to content

Commit

Permalink
fix bug when new threads are not created in callbackService even unde…
Browse files Browse the repository at this point in the history
…r the load (#15)

* Replace unbounded LinkedBlockingQueue with SynchronousQueue when creating a callbackService

* Use cachedThreadPool instead of custom one for callbackService

Co-authored-by: Stanislav Serdyukov <[email protected]>
  • Loading branch information
eksd and Stanislav Serdyukov authored Mar 30, 2021
1 parent 36d4aa8 commit 23609ad
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 14 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ _by timeout_ and _by buffer size_.
|flink |flink-clickhouse-sink |
|:-------:|:--------------------:|
|1.3.* |1.0.0 |
|1.9.0 |1.3.0 |
|1.9.* |1.3.1 |


### Install
Expand All @@ -29,7 +29,7 @@ _by timeout_ and _by buffer size_.
<dependency>
<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.3.0</version>
<version>1.3.1</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>ru.ivi.opensource</groupId>
<artifactId>flink-clickhouse-sink</artifactId>
<version>1.3.0-SNAPSHOT</version>
<version>1.3.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>Flink ClickHouse sink</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,7 @@ private void buildComponents() {
service = Executors.newFixedThreadPool(sinkParams.getNumWriters(), threadFactory);

ThreadFactory callbackServiceFactory = ThreadUtil.threadFactory("clickhouse-writer-callback-executor");

int cores = Runtime.getRuntime().availableProcessors();
int coreThreadsNum = Math.max(cores / 4, 2);
callbackService = new ThreadPoolExecutor(
coreThreadsNum,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
callbackServiceFactory);

callbackService = Executors.newCachedThreadPool(callbackServiceFactory);

int numWriters = sinkParams.getNumWriters();
tasks = Lists.newArrayListWithCapacity(numWriters);
Expand Down

0 comments on commit 23609ad

Please sign in to comment.