diff --git a/pom.xml b/pom.xml
index e33d593a2eb7..a46c05f4a19c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -605,6 +605,7 @@
+ spring-reactive-modules/spring-reactive-kafka-stream-binder
@@ -1000,6 +1001,7 @@
+ spring-reactive-modules/spring-reactive-kafka-stream-binder
diff --git a/spring-reactive-modules/pom.xml b/spring-reactive-modules/pom.xml
index d9c301632f95..170e301b3b8f 100644
--- a/spring-reactive-modules/pom.xml
+++ b/spring-reactive-modules/pom.xml
@@ -31,6 +31,7 @@
spring-reactive-exceptions
spring-reactor
spring-webflux-amqp
+ spring-reactive-kafka-stream-binder
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/pom.xml b/spring-reactive-modules/spring-reactive-kafka-stream-binder/pom.xml
new file mode 100644
index 000000000000..7d87ba055523
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/pom.xml
@@ -0,0 +1,145 @@
+
+
+ 4.0.0
+
+ com.baeldung.webflux.kafka.stream
+ spring-reactive-kafka-stream-binder
+ 1.0.0-SNAPSHOT
+ jar
+ spring-reactive-kafka-stream-binder
+ WebFlux and Spring Cloud Stream Reactive Kafka Binder
+
+
+ com.baeldung
+ parent-boot-3
+ 0.0.1-SNAPSHOT
+ ../../parent-boot-3
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-dependencies
+ ${spring-cloud.version}
+ pom
+ import
+
+
+ org.testcontainers
+ testcontainers-bom
+ ${testcontainers.version}
+ pom
+ import
+
+
+
+
+
+
+
+ org.projectlombok
+ lombok
+ ${lombok.version}
+ provided
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-starter-stream-kafka
+
+
+ org.springframework.cloud
+ spring-cloud-stream-binder-kafka-reactive
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+ com.clickhouse
+ clickhouse-r2dbc
+ ${clickhouse.version}
+
+
+ com.clickhouse
+ clickhouse-jdbc
+ ${clickhouse.version}
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+ org.testcontainers
+ testcontainers
+
+
+ org.slf4j
+ slf4j-api
+
+
+ junit
+ junit
+
+
+ test
+
+
+ org.testcontainers
+ clickhouse
+ test
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ org.testcontainers
+ kafka
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ ${maven-surefire-plugin.version}
+
+
+ ${java.version}
+ ${project.build.sourceEncoding}
+ true
+
+ **/*IntegrationTest.java
+ **/*LiveTest.java
+
+
+
+
+
+
+
+ 2023.0.4
+ com.baeldung.webflux.kafka.stream.Application
+ 17
+ 1.20.3
+ 0.7.1
+ 17
+ 17
+
+
+
\ No newline at end of file
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/Application.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/Application.java
new file mode 100644
index 000000000000..d1aceb44ad6c
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/Application.java
@@ -0,0 +1,12 @@
+package com.baeldung.reactive.kafka.stream.binder;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class Application {
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+}
+
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/api/StocksApi.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/api/StocksApi.java
new file mode 100644
index 000000000000..8caaad7b46ac
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/api/StocksApi.java
@@ -0,0 +1,35 @@
+package com.baeldung.reactive.kafka.stream.binder.api;
+
+import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate;
+import com.baeldung.reactive.kafka.stream.binder.repository.ClickHouseRepository;
+import jakarta.validation.constraints.NotNull;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.server.ResponseStatusException;
+import reactor.core.publisher.Flux;
+
+import java.time.Instant;
+
+@RestController
+public class StocksApi {
+
+ private final ClickHouseRepository repository;
+
+ @Autowired
+ public StocksApi(ClickHouseRepository repository) {
+ this.repository = repository;
+ }
+
+ @GetMapping("/stock-prices-out")
+ public Flux getAvgStockPrices(@RequestParam("from") @NotNull Instant from, @RequestParam("to") @NotNull Instant to) {
+
+ if (from.isAfter(to)) {
+ throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "'from' must come before 'to'");
+ }
+
+ return repository.findMinuteAvgStockPrices(from, to);
+ }
+}
\ No newline at end of file
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/StockUpdate.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/StockUpdate.java
new file mode 100644
index 000000000000..5cf20e25e448
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/StockUpdate.java
@@ -0,0 +1,7 @@
+package com.baeldung.reactive.kafka.stream.binder.domain;
+
+import java.time.Instant;
+
+public record StockUpdate(String symbol, double price, String currency, Instant timestamp) {
+
+}
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/currency/CurrencyRate.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/currency/CurrencyRate.java
new file mode 100644
index 000000000000..d982e489abe4
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/currency/CurrencyRate.java
@@ -0,0 +1,7 @@
+package com.baeldung.reactive.kafka.stream.binder.domain.currency;
+
+import reactor.core.publisher.Mono;
+
+public interface CurrencyRate {
+ Mono convertRate(String from, String to, double amount);
+}
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/TopicConfig.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/TopicConfig.java
new file mode 100644
index 000000000000..20470f510acf
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/TopicConfig.java
@@ -0,0 +1,38 @@
+package com.baeldung.reactive.kafka.stream.binder.kafka;
+
+import org.apache.kafka.clients.admin.NewTopic;
+import org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.TopicBuilder;
+import org.springframework.kafka.support.KafkaHeaderMapper;
+
+@Configuration
+public class TopicConfig {
+
+ public static final String STOCK_PRICES_OUT = "stock-prices-out";
+ public static final String STOCK_PRICES_IN = "stock-prices-in";
+
+ @Bean(STOCK_PRICES_IN)
+ public NewTopic stockPricesIn() {
+ return TopicBuilder.name(STOCK_PRICES_IN)
+ .partitions(2)
+ .replicas(1)
+ .build();
+ }
+
+ @Bean(STOCK_PRICES_OUT)
+ public NewTopic stockPricesOut() {
+ return TopicBuilder.name(STOCK_PRICES_OUT)
+ .partitions(2)
+ .replicas(1)
+ .build();
+ }
+
+ @Bean
+ public KafkaHeaderMapper kafkaBinderHeaderMapper() {
+ BinderHeaderMapper headerMapper = new BinderHeaderMapper();
+ headerMapper.setMapAllStringsOut(true);
+ return headerMapper;
+ }
+}
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/consumer/StockPriceConsumer.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/consumer/StockPriceConsumer.java
new file mode 100644
index 000000000000..9d91f5fe8879
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/consumer/StockPriceConsumer.java
@@ -0,0 +1,66 @@
+package com.baeldung.reactive.kafka.stream.binder.kafka.consumer;
+
+import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate;
+import com.baeldung.reactive.kafka.stream.binder.kafka.TopicConfig;
+import jakarta.annotation.PostConstruct;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate;
+import org.springframework.stereotype.Component;
+import reactor.core.scheduler.Schedulers;
+import reactor.kafka.receiver.ReceiverOptions;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+@Component
+@Slf4j
+public class StockPriceConsumer {
+
+ private final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate;
+ private final AtomicInteger count = new AtomicInteger();
+
+ @SuppressWarnings("all")
+ public StockPriceConsumer(@NonNull KafkaProperties properties, @Qualifier(TopicConfig.STOCK_PRICES_OUT) NewTopic topic) {
+ var receiverOptions = ReceiverOptions.create(properties.buildConsumerProperties())
+ .subscription(List.of(topic.name()))
+ .assignment(IntStream.range(0, topic.numPartitions()).mapToObj(i -> new TopicPartition(topic.name(), i)).toList())
+ .addAssignListener(partitions -> log.info("************** onPartitionsAssigned: {}", partitions));
+
+ this.kafkaConsumerTemplate = new ReactiveKafkaConsumerTemplate<>(receiverOptions);;
+ }
+
+ @PostConstruct
+ public void consume() {
+ Schedulers.boundedElastic().schedule(() -> kafkaConsumerTemplate
+ .receiveAutoAck()
+ .doOnNext(consumerRecord -> {
+ // simulate processing
+ count.incrementAndGet();
+
+ log.info(
+ "received key={}, value={} from topic={}, offset={}, partition={}", consumerRecord.key(),
+ consumerRecord.value(),
+ consumerRecord.topic(),
+ consumerRecord.offset(),
+ consumerRecord.partition());
+ })
+ .doOnError(e -> log.error("Consumer error", e))
+ .doOnComplete(() -> log.info("Consumed all messages"))
+ .subscribe());
+ }
+
+ public int getCount() {
+ return count.get();
+ }
+
+ public void resetCount() {
+ count.set(0);
+ }
+
+}
\ No newline at end of file
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/processor/StockPriceProcessor.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/processor/StockPriceProcessor.java
new file mode 100644
index 000000000000..c3842a6f0e2f
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/processor/StockPriceProcessor.java
@@ -0,0 +1,41 @@
+package com.baeldung.reactive.kafka.stream.binder.kafka.processor;
+
+import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate;
+import com.baeldung.reactive.kafka.stream.binder.domain.currency.CurrencyRate;
+import com.baeldung.reactive.kafka.stream.binder.repository.ClickHouseRepository;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.MessageBuilder;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.function.Function;
+
+@Configuration
+public class StockPriceProcessor {
+
+ private static final String USD = "USD";
+ private static final String EUR = "EUR";
+
+ @Bean
+ public Function>, Flux>> processStockPrices(ClickHouseRepository repository, CurrencyRate currencyRate) {
+ return stockPrices -> stockPrices.flatMapSequential(message -> {
+ StockUpdate stockUpdate = message.getPayload();
+ return repository.saveStockPrice(stockUpdate)
+ .flatMap(success -> Boolean.TRUE.equals(success) ? Mono.just(stockUpdate) : Mono.empty())
+ .flatMap(stock -> currencyRate.convertRate(USD, EUR, stock.price()))
+ .map(newPrice -> convertPrice(stockUpdate, newPrice))
+ .map(priceInEuro -> MessageBuilder.withPayload(priceInEuro)
+ .setHeader(KafkaHeaders.KEY, stockUpdate.symbol())
+ .copyHeaders(message.getHeaders())
+ .build());
+ });
+ }
+
+ private StockUpdate convertPrice(StockUpdate stockUpdate, double newPrice) {
+ return new StockUpdate(stockUpdate.symbol(), newPrice, EUR, stockUpdate.timestamp());
+ }
+
+}
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/producer/StockPriceProducer.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/producer/StockPriceProducer.java
new file mode 100644
index 000000000000..e30a7c8c11c8
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/producer/StockPriceProducer.java
@@ -0,0 +1,55 @@
+package com.baeldung.reactive.kafka.stream.binder.kafka.producer;
+
+import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate;
+import com.baeldung.reactive.kafka.stream.binder.kafka.TopicConfig;
+import lombok.NonNull;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.http.MediaType;
+import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
+import org.springframework.messaging.MessageHeaders;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderResult;
+
+import java.time.Instant;
+import java.util.Random;
+
+@Component
+public class StockPriceProducer {
+
+ public static final String[] STOCKS = {"AAPL", "GOOG", "MSFT", "AMZN", "TSLA"};
+ private static final String CURRENCY = "USD";
+
+ private final ReactiveKafkaProducerTemplate kafkaProducer;
+ private final NewTopic topic;
+ private final Random random = new Random();
+
+ @SuppressWarnings("all")
+ public StockPriceProducer(@NonNull KafkaProperties properties, @Qualifier(TopicConfig.STOCK_PRICES_IN) NewTopic topic) {
+ this.kafkaProducer = new ReactiveKafkaProducerTemplate<>(SenderOptions.create(properties.buildProducerProperties()));
+ this.topic = topic;
+ }
+
+ public Flux> produceStockPrices(int count) {
+ return Flux.range(0, count)
+ .map(i -> {
+ String stock = STOCKS[random.nextInt(STOCKS.length)];
+ double price = 100 + (200 * random.nextDouble());
+ return MessageBuilder.withPayload(new StockUpdate(stock, price, CURRENCY, Instant.now()))
+ .setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
+ .build();
+ })
+ .flatMap(stock -> {
+ var newRecord = new ProducerRecord<>(topic.name(), stock.getPayload().symbol(), stock.getPayload());
+
+ stock.getHeaders().forEach((key, value) -> newRecord.headers().add(key, value.toString().getBytes()));
+
+ return kafkaProducer.send(newRecord);
+ });
+ }
+}
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseConfig.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseConfig.java
new file mode 100644
index 000000000000..08bd64434462
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseConfig.java
@@ -0,0 +1,25 @@
+package com.baeldung.reactive.kafka.stream.binder.repository;
+
+import io.r2dbc.spi.ConnectionFactories;
+import io.r2dbc.spi.ConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class ClickHouseConfig {
+
+ @Value("${clickhouse.r2dbc.url}")
+ private String url;
+
+ @Bean
+ public ConnectionFactory connectionFactory() {
+ return ConnectionFactories.get(url);
+ }
+
+ @Bean
+ public ClickHouseRepository clickHouseRepository(ConnectionFactory connectionFactory) {
+ return ClickHouseRepository.create(connectionFactory);
+ }
+
+}
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseRepository.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseRepository.java
new file mode 100644
index 000000000000..671cb25868cc
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseRepository.java
@@ -0,0 +1,97 @@
+package com.baeldung.reactive.kafka.stream.binder.repository;
+
+import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate;
+import io.r2dbc.spi.Connection;
+import io.r2dbc.spi.ConnectionFactory;
+import io.r2dbc.spi.Result;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Arrays;
+
+public interface ClickHouseRepository {
+
+ static ClickHouseRepository create(ConnectionFactory connectionFactory) {
+ return () -> Mono.from(connectionFactory.create());
+ }
+
+ Mono create();
+
+ default Mono initDatabase() {
+ Resource resource = new ClassPathResource("init.sql");
+ return readFile(resource).flatMap(this::runScript);
+ }
+
+ private Mono readFile(Resource resource) {
+ final var dataBufferFlux = DataBufferUtils.read(
+ resource,
+ new DefaultDataBufferFactory(),
+ 1024
+ );
+
+ return dataBufferFlux.reduce("", (data, buffer) -> {
+ String content = buffer.toString(StandardCharsets.UTF_8);
+ DataBufferUtils.release(buffer);
+ return data + content;
+ });
+ }
+
+ default Mono runScript(String scriptContent) {
+ return Mono.usingWhen(
+ create(),
+ connection -> executeScript(connection, scriptContent),
+ Connection::close
+ );
+ }
+
+ private Mono executeScript(Connection connection, String scriptContent) {
+ return Flux.fromIterable(Arrays.stream(scriptContent.split(";")).toList())
+ .map(String::trim)
+ .filter(statement -> !statement.isEmpty())
+ .concatMap(statement -> connection.createStatement(statement).execute())
+ .then();
+ }
+
+ default Mono saveStockPrice(StockUpdate stockUpdate) {
+ return create()
+ .flatMapMany(connection -> connection.createStatement("INSERT INTO stock_prices (symbol, original_price, currency, timestamp) VALUES (:symbol, :original_price, :currency, parseDateTime64BestEffort(:timestamp, 9))")
+ .bind("symbol", stockUpdate.symbol())
+ .bind("original_price", stockUpdate.price())
+ .bind("currency", stockUpdate.currency())
+ .bind("timestamp", stockUpdate.timestamp().toString())
+ .execute())
+ .flatMap(Result::getRowsUpdated)
+ .reduce(Long::sum)
+ .map(count -> count > 0);
+
+ }
+
+ default Flux findMinuteAvgStockPrices(Instant from, Instant to) {
+ return create()
+ .flatMapMany(connection -> connection.createStatement("""
+ SELECT
+ symbol,
+ currency,
+ date_time,
+ avgMerge(avg_price) AS avg_price
+ FROM avg_stock_prices
+ WHERE date_time BETWEEN parseDateTime64BestEffort(:from, 9) AND parseDateTime64BestEffort(:to, 9)
+ GROUP BY symbol, currency, date_time;
+ """)
+ .bind("from", from.toString())
+ .bind("to", to.toString())
+ .execute())
+ .flatMap(result -> result.map((row, rowMetadata) -> new StockUpdate(
+ row.get("symbol", String.class),
+ row.get("avg_price", Double.class),
+ row.get("currency", String.class),
+ row.get("date_time", Instant.class)
+ )));
+ }
+}
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/application.yml b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/application.yml
new file mode 100644
index 000000000000..897d2339271d
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/application.yml
@@ -0,0 +1,39 @@
+spring:
+ cloud:
+ stream:
+ default-binder: kafka # Specify kafka as the default binder
+ kafka:
+ binder:
+ brokers: ${KAFKA_BROKER}
+ bindings:
+ default:
+ content-type: application/json
+ processStockPrices-in-0:
+ destination: stock-prices-in
+ group: live-stock-consumers-x
+ processStockPrices-out-0:
+ destination: stock-prices-out
+ group: live-stock-consumers-y
+ producer:
+ useNativeEncoding: true
+
+ kafka:
+ consumer:
+ key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+ value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
+ group-id: my-group
+ properties:
+ reactiveAutoCommit: true
+ producer:
+ key-serializer: org.apache.kafka.common.serialization.StringSerializer
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
+ properties:
+ spring:
+ json:
+ trusted:
+ packages: '*'
+
+clickhouse:
+ r2dbc:
+ url: r2dbc:clickhouse://localhost:8123/default
+
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/init.sql b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/init.sql
new file mode 100644
index 000000000000..2b9e3aa17967
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/init.sql
@@ -0,0 +1,27 @@
+CREATE TABLE IF NOT EXISTS stock_prices (
+ symbol String,
+ original_price Float64,
+ currency String,
+ timestamp DateTime64(9, 'UTC')
+) ENGINE = MergeTree() ORDER BY (symbol, timestamp);
+
+CREATE TABLE IF NOT EXISTS avg_stock_prices (
+ date_time DateTime64(9, 'UTC'),
+ symbol String,
+ currency String,
+ avg_price AggregateFunction(avg, Float64)
+) ENGINE = AggregatingMergeTree()
+ORDER BY (symbol, currency, date_time);
+
+DROP TABLE IF EXISTS avg_stock_prices_mv;
+
+CREATE MATERIALIZED VIEW avg_stock_prices_mv TO avg_stock_prices AS
+SELECT
+ toStartOfMinute(timestamp) AS date_time,
+ symbol,
+ currency,
+ avgState(original_price) AS avg_price
+FROM stock_prices
+GROUP BY symbol, currency, date_time;
+
+
diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/test/java/com/baeldung/reactive/kafka/stream/binder/StockLiveTest.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/test/java/com/baeldung/reactive/kafka/stream/binder/StockLiveTest.java
new file mode 100644
index 000000000000..c9ca28442e96
--- /dev/null
+++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/test/java/com/baeldung/reactive/kafka/stream/binder/StockLiveTest.java
@@ -0,0 +1,136 @@
+package com.baeldung.reactive.kafka.stream.binder;
+
+import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate;
+import com.baeldung.reactive.kafka.stream.binder.domain.currency.CurrencyRate;
+import com.baeldung.reactive.kafka.stream.binder.kafka.consumer.StockPriceConsumer;
+import com.baeldung.reactive.kafka.stream.binder.kafka.producer.StockPriceProducer;
+import com.baeldung.reactive.kafka.stream.binder.repository.ClickHouseRepository;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.testcontainers.clickhouse.ClickHouseContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.kafka.KafkaContainer;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+import java.util.Set;
+
+@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
+@Testcontainers(parallel = true)
+class StockLiveTest {
+
+ @TestConfiguration
+ public static class Configuration {
+
+ @Bean
+ @Primary
+ public CurrencyRate currencyRate() {
+ return (from, to, amount) -> Mono.just(amount * 1.2);
+ }
+
+ }
+
+ @Container
+ static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:24.3-alpine");
+
+ @Container
+ static KafkaContainer kafkaContainer = new KafkaContainer("apache/kafka:3.9.0")
+ .withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094")
+ .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true");
+
+ @DynamicPropertySource
+ static void registerDynamicProperties(DynamicPropertyRegistry registry) {
+ registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
+ registry.add("spring.cloud.stream.kafka.binder.brokers", kafkaContainer::getBootstrapServers);
+ registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);
+ registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers);
+ registry.add("clickhouse.r2dbc.url",
+ () -> "r2dbc:clickhouse://%s:%s/default".formatted(clickHouseContainer.getHost(), clickHouseContainer.getMappedPort(8123)));
+ }
+
+
+ @Autowired
+ private StockPriceProducer producer;
+
+ @Autowired
+ private StockPriceConsumer consumer;
+
+ @Autowired
+ private ClickHouseRepository clickHouseRepository;
+
+ @Autowired
+ private WebTestClient webTestClient;
+
+ @BeforeEach
+ void setup() {
+ clickHouseRepository.initDatabase().block();
+ clickHouseRepository.runScript("DELETE FROM stock_prices WHERE symbol IS NOT NULL").block();
+ consumer.resetCount();
+ }
+
+ @Test
+ void givenTheKafkaProducer_whenSubmittingEvents_thenProcessAllEventUntilItArrivesToConsumers() {
+
+ var eventCount = 200;
+
+ producer.produceStockPrices(eventCount).subscribe();
+
+ Awaitility
+ .waitAtMost(Duration.ofSeconds(60))
+ .untilAsserted(() -> {
+ Assert.assertTrue(consumer.getCount() == eventCount);
+ });
+ }
+
+ @Test
+ void givenTheKafkaProducer_whenStreamConfigured_thenProcessAllEventsAndSaveThemInClickHouse() {
+ var eventCount = 200;
+
+ var start = Instant.now().truncatedTo(ChronoUnit.MINUTES);
+
+ producer.produceStockPrices(eventCount).subscribe();
+
+ Awaitility
+ .waitAtMost(Duration.ofSeconds(60))
+ .untilAsserted(() -> {
+ Assert.assertTrue(consumer.getCount() == eventCount);
+ });
+
+ var end = Instant.now().plusSeconds(60).truncatedTo(ChronoUnit.MINUTES);
+
+ List updates = webTestClient.get()
+ .uri("/stock-prices-out?from={from}&to={to}", start, end)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBodyList(StockUpdate.class)
+ .returnResult()
+ .getResponseBody();
+
+ Assert.assertFalse(updates.isEmpty());
+
+ updates.forEach(update -> {
+ Assert.assertTrue(update.price() > 0);
+ Assert.assertTrue(Set.of(StockPriceProducer.STOCKS).contains(update.symbol()));
+ Assert.assertTrue(update.currency().equals("USD"));
+ Assert.assertTrue(isBetween(update.timestamp(), start, end));
+ });
+ }
+
+ public boolean isBetween(Instant target, Instant start, Instant end) {
+ return !target.isBefore(start) && !target.isAfter(end);
+ }
+}