diff --git a/pom.xml b/pom.xml index e33d593a2eb7..a46c05f4a19c 100644 --- a/pom.xml +++ b/pom.xml @@ -605,6 +605,7 @@ + spring-reactive-modules/spring-reactive-kafka-stream-binder @@ -1000,6 +1001,7 @@ + spring-reactive-modules/spring-reactive-kafka-stream-binder diff --git a/spring-reactive-modules/pom.xml b/spring-reactive-modules/pom.xml index d9c301632f95..170e301b3b8f 100644 --- a/spring-reactive-modules/pom.xml +++ b/spring-reactive-modules/pom.xml @@ -31,6 +31,7 @@ spring-reactive-exceptions spring-reactor spring-webflux-amqp + spring-reactive-kafka-stream-binder diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/pom.xml b/spring-reactive-modules/spring-reactive-kafka-stream-binder/pom.xml new file mode 100644 index 000000000000..7d87ba055523 --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/pom.xml @@ -0,0 +1,145 @@ + + + 4.0.0 + + com.baeldung.webflux.kafka.stream + spring-reactive-kafka-stream-binder + 1.0.0-SNAPSHOT + jar + spring-reactive-kafka-stream-binder + WebFlux and Spring Cloud Stream Reactive Kafka Binder + + + com.baeldung + parent-boot-3 + 0.0.1-SNAPSHOT + ../../parent-boot-3 + + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + org.testcontainers + testcontainers-bom + ${testcontainers.version} + pom + import + + + + + + + + org.projectlombok + lombok + ${lombok.version} + provided + + + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + + + org.springframework.cloud + spring-cloud-stream-binder-kafka-reactive + + + org.springframework.boot + spring-boot-starter-webflux + + + com.clickhouse + clickhouse-r2dbc + ${clickhouse.version} + + + com.clickhouse + clickhouse-jdbc + ${clickhouse.version} + test + + + org.springframework.boot + spring-boot-starter-test + test + + + org.testcontainers + testcontainers + + + org.slf4j + slf4j-api + + + junit + junit + + + test + + + org.testcontainers + clickhouse + test + + + io.projectreactor + reactor-test + test + + + org.testcontainers + kafka + test + + + org.testcontainers + junit-jupiter + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + + ${java.version} + ${java.version} + ${project.build.sourceEncoding} + true + + **/*IntegrationTest.java + **/*LiveTest.java + + + + + + + + 2023.0.4 + com.baeldung.webflux.kafka.stream.Application + 17 + 1.20.3 + 0.7.1 + 17 + 17 + + + \ No newline at end of file diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/Application.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/Application.java new file mode 100644 index 000000000000..d1aceb44ad6c --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/Application.java @@ -0,0 +1,12 @@ +package com.baeldung.reactive.kafka.stream.binder; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } +} + diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/api/StocksApi.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/api/StocksApi.java new file mode 100644 index 000000000000..8caaad7b46ac --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/api/StocksApi.java @@ -0,0 +1,35 @@ +package com.baeldung.reactive.kafka.stream.binder.api; + +import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate; +import com.baeldung.reactive.kafka.stream.binder.repository.ClickHouseRepository; +import jakarta.validation.constraints.NotNull; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.server.ResponseStatusException; +import reactor.core.publisher.Flux; + +import java.time.Instant; + +@RestController +public class StocksApi { + + private final ClickHouseRepository repository; + + @Autowired + public StocksApi(ClickHouseRepository repository) { + this.repository = repository; + } + + @GetMapping("/stock-prices-out") + public Flux getAvgStockPrices(@RequestParam("from") @NotNull Instant from, @RequestParam("to") @NotNull Instant to) { + + if (from.isAfter(to)) { + throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "'from' must come before 'to'"); + } + + return repository.findMinuteAvgStockPrices(from, to); + } +} \ No newline at end of file diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/StockUpdate.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/StockUpdate.java new file mode 100644 index 000000000000..5cf20e25e448 --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/StockUpdate.java @@ -0,0 +1,7 @@ +package com.baeldung.reactive.kafka.stream.binder.domain; + +import java.time.Instant; + +public record StockUpdate(String symbol, double price, String currency, Instant timestamp) { + +} diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/currency/CurrencyRate.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/currency/CurrencyRate.java new file mode 100644 index 000000000000..d982e489abe4 --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/domain/currency/CurrencyRate.java @@ -0,0 +1,7 @@ +package com.baeldung.reactive.kafka.stream.binder.domain.currency; + +import reactor.core.publisher.Mono; + +public interface CurrencyRate { + Mono convertRate(String from, String to, double amount); +} diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/TopicConfig.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/TopicConfig.java new file mode 100644 index 000000000000..20470f510acf --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/TopicConfig.java @@ -0,0 +1,38 @@ +package com.baeldung.reactive.kafka.stream.binder.kafka; + +import org.apache.kafka.clients.admin.NewTopic; +import org.springframework.cloud.stream.binder.kafka.BinderHeaderMapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.support.KafkaHeaderMapper; + +@Configuration +public class TopicConfig { + + public static final String STOCK_PRICES_OUT = "stock-prices-out"; + public static final String STOCK_PRICES_IN = "stock-prices-in"; + + @Bean(STOCK_PRICES_IN) + public NewTopic stockPricesIn() { + return TopicBuilder.name(STOCK_PRICES_IN) + .partitions(2) + .replicas(1) + .build(); + } + + @Bean(STOCK_PRICES_OUT) + public NewTopic stockPricesOut() { + return TopicBuilder.name(STOCK_PRICES_OUT) + .partitions(2) + .replicas(1) + .build(); + } + + @Bean + public KafkaHeaderMapper kafkaBinderHeaderMapper() { + BinderHeaderMapper headerMapper = new BinderHeaderMapper(); + headerMapper.setMapAllStringsOut(true); + return headerMapper; + } +} diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/consumer/StockPriceConsumer.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/consumer/StockPriceConsumer.java new file mode 100644 index 000000000000..9d91f5fe8879 --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/consumer/StockPriceConsumer.java @@ -0,0 +1,66 @@ +package com.baeldung.reactive.kafka.stream.binder.kafka.consumer; + +import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate; +import com.baeldung.reactive.kafka.stream.binder.kafka.TopicConfig; +import jakarta.annotation.PostConstruct; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.kafka.core.reactive.ReactiveKafkaConsumerTemplate; +import org.springframework.stereotype.Component; +import reactor.core.scheduler.Schedulers; +import reactor.kafka.receiver.ReceiverOptions; + +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; + +@Component +@Slf4j +public class StockPriceConsumer { + + private final ReactiveKafkaConsumerTemplate kafkaConsumerTemplate; + private final AtomicInteger count = new AtomicInteger(); + + @SuppressWarnings("all") + public StockPriceConsumer(@NonNull KafkaProperties properties, @Qualifier(TopicConfig.STOCK_PRICES_OUT) NewTopic topic) { + var receiverOptions = ReceiverOptions.create(properties.buildConsumerProperties()) + .subscription(List.of(topic.name())) + .assignment(IntStream.range(0, topic.numPartitions()).mapToObj(i -> new TopicPartition(topic.name(), i)).toList()) + .addAssignListener(partitions -> log.info("************** onPartitionsAssigned: {}", partitions)); + + this.kafkaConsumerTemplate = new ReactiveKafkaConsumerTemplate<>(receiverOptions);; + } + + @PostConstruct + public void consume() { + Schedulers.boundedElastic().schedule(() -> kafkaConsumerTemplate + .receiveAutoAck() + .doOnNext(consumerRecord -> { + // simulate processing + count.incrementAndGet(); + + log.info( + "received key={}, value={} from topic={}, offset={}, partition={}", consumerRecord.key(), + consumerRecord.value(), + consumerRecord.topic(), + consumerRecord.offset(), + consumerRecord.partition()); + }) + .doOnError(e -> log.error("Consumer error", e)) + .doOnComplete(() -> log.info("Consumed all messages")) + .subscribe()); + } + + public int getCount() { + return count.get(); + } + + public void resetCount() { + count.set(0); + } + +} \ No newline at end of file diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/processor/StockPriceProcessor.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/processor/StockPriceProcessor.java new file mode 100644 index 000000000000..c3842a6f0e2f --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/processor/StockPriceProcessor.java @@ -0,0 +1,41 @@ +package com.baeldung.reactive.kafka.stream.binder.kafka.processor; + +import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate; +import com.baeldung.reactive.kafka.stream.binder.domain.currency.CurrencyRate; +import com.baeldung.reactive.kafka.stream.binder.repository.ClickHouseRepository; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.function.Function; + +@Configuration +public class StockPriceProcessor { + + private static final String USD = "USD"; + private static final String EUR = "EUR"; + + @Bean + public Function>, Flux>> processStockPrices(ClickHouseRepository repository, CurrencyRate currencyRate) { + return stockPrices -> stockPrices.flatMapSequential(message -> { + StockUpdate stockUpdate = message.getPayload(); + return repository.saveStockPrice(stockUpdate) + .flatMap(success -> Boolean.TRUE.equals(success) ? Mono.just(stockUpdate) : Mono.empty()) + .flatMap(stock -> currencyRate.convertRate(USD, EUR, stock.price())) + .map(newPrice -> convertPrice(stockUpdate, newPrice)) + .map(priceInEuro -> MessageBuilder.withPayload(priceInEuro) + .setHeader(KafkaHeaders.KEY, stockUpdate.symbol()) + .copyHeaders(message.getHeaders()) + .build()); + }); + } + + private StockUpdate convertPrice(StockUpdate stockUpdate, double newPrice) { + return new StockUpdate(stockUpdate.symbol(), newPrice, EUR, stockUpdate.timestamp()); + } + +} diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/producer/StockPriceProducer.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/producer/StockPriceProducer.java new file mode 100644 index 000000000000..e30a7c8c11c8 --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/kafka/producer/StockPriceProducer.java @@ -0,0 +1,55 @@ +package com.baeldung.reactive.kafka.stream.binder.kafka.producer; + +import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate; +import com.baeldung.reactive.kafka.stream.binder.kafka.TopicConfig; +import lombok.NonNull; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.http.MediaType; +import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderResult; + +import java.time.Instant; +import java.util.Random; + +@Component +public class StockPriceProducer { + + public static final String[] STOCKS = {"AAPL", "GOOG", "MSFT", "AMZN", "TSLA"}; + private static final String CURRENCY = "USD"; + + private final ReactiveKafkaProducerTemplate kafkaProducer; + private final NewTopic topic; + private final Random random = new Random(); + + @SuppressWarnings("all") + public StockPriceProducer(@NonNull KafkaProperties properties, @Qualifier(TopicConfig.STOCK_PRICES_IN) NewTopic topic) { + this.kafkaProducer = new ReactiveKafkaProducerTemplate<>(SenderOptions.create(properties.buildProducerProperties())); + this.topic = topic; + } + + public Flux> produceStockPrices(int count) { + return Flux.range(0, count) + .map(i -> { + String stock = STOCKS[random.nextInt(STOCKS.length)]; + double price = 100 + (200 * random.nextDouble()); + return MessageBuilder.withPayload(new StockUpdate(stock, price, CURRENCY, Instant.now())) + .setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .build(); + }) + .flatMap(stock -> { + var newRecord = new ProducerRecord<>(topic.name(), stock.getPayload().symbol(), stock.getPayload()); + + stock.getHeaders().forEach((key, value) -> newRecord.headers().add(key, value.toString().getBytes())); + + return kafkaProducer.send(newRecord); + }); + } +} diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseConfig.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseConfig.java new file mode 100644 index 000000000000..08bd64434462 --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseConfig.java @@ -0,0 +1,25 @@ +package com.baeldung.reactive.kafka.stream.binder.repository; + +import io.r2dbc.spi.ConnectionFactories; +import io.r2dbc.spi.ConnectionFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class ClickHouseConfig { + + @Value("${clickhouse.r2dbc.url}") + private String url; + + @Bean + public ConnectionFactory connectionFactory() { + return ConnectionFactories.get(url); + } + + @Bean + public ClickHouseRepository clickHouseRepository(ConnectionFactory connectionFactory) { + return ClickHouseRepository.create(connectionFactory); + } + +} diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseRepository.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseRepository.java new file mode 100644 index 000000000000..671cb25868cc --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/java/com/baeldung/reactive/kafka/stream/binder/repository/ClickHouseRepository.java @@ -0,0 +1,97 @@ +package com.baeldung.reactive.kafka.stream.binder.repository; + +import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate; +import io.r2dbc.spi.Connection; +import io.r2dbc.spi.ConnectionFactory; +import io.r2dbc.spi.Result; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; + +public interface ClickHouseRepository { + + static ClickHouseRepository create(ConnectionFactory connectionFactory) { + return () -> Mono.from(connectionFactory.create()); + } + + Mono create(); + + default Mono initDatabase() { + Resource resource = new ClassPathResource("init.sql"); + return readFile(resource).flatMap(this::runScript); + } + + private Mono readFile(Resource resource) { + final var dataBufferFlux = DataBufferUtils.read( + resource, + new DefaultDataBufferFactory(), + 1024 + ); + + return dataBufferFlux.reduce("", (data, buffer) -> { + String content = buffer.toString(StandardCharsets.UTF_8); + DataBufferUtils.release(buffer); + return data + content; + }); + } + + default Mono runScript(String scriptContent) { + return Mono.usingWhen( + create(), + connection -> executeScript(connection, scriptContent), + Connection::close + ); + } + + private Mono executeScript(Connection connection, String scriptContent) { + return Flux.fromIterable(Arrays.stream(scriptContent.split(";")).toList()) + .map(String::trim) + .filter(statement -> !statement.isEmpty()) + .concatMap(statement -> connection.createStatement(statement).execute()) + .then(); + } + + default Mono saveStockPrice(StockUpdate stockUpdate) { + return create() + .flatMapMany(connection -> connection.createStatement("INSERT INTO stock_prices (symbol, original_price, currency, timestamp) VALUES (:symbol, :original_price, :currency, parseDateTime64BestEffort(:timestamp, 9))") + .bind("symbol", stockUpdate.symbol()) + .bind("original_price", stockUpdate.price()) + .bind("currency", stockUpdate.currency()) + .bind("timestamp", stockUpdate.timestamp().toString()) + .execute()) + .flatMap(Result::getRowsUpdated) + .reduce(Long::sum) + .map(count -> count > 0); + + } + + default Flux findMinuteAvgStockPrices(Instant from, Instant to) { + return create() + .flatMapMany(connection -> connection.createStatement(""" + SELECT + symbol, + currency, + date_time, + avgMerge(avg_price) AS avg_price + FROM avg_stock_prices + WHERE date_time BETWEEN parseDateTime64BestEffort(:from, 9) AND parseDateTime64BestEffort(:to, 9) + GROUP BY symbol, currency, date_time; + """) + .bind("from", from.toString()) + .bind("to", to.toString()) + .execute()) + .flatMap(result -> result.map((row, rowMetadata) -> new StockUpdate( + row.get("symbol", String.class), + row.get("avg_price", Double.class), + row.get("currency", String.class), + row.get("date_time", Instant.class) + ))); + } +} diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/application.yml b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/application.yml new file mode 100644 index 000000000000..897d2339271d --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/application.yml @@ -0,0 +1,39 @@ +spring: + cloud: + stream: + default-binder: kafka # Specify kafka as the default binder + kafka: + binder: + brokers: ${KAFKA_BROKER} + bindings: + default: + content-type: application/json + processStockPrices-in-0: + destination: stock-prices-in + group: live-stock-consumers-x + processStockPrices-out-0: + destination: stock-prices-out + group: live-stock-consumers-y + producer: + useNativeEncoding: true + + kafka: + consumer: + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + group-id: my-group + properties: + reactiveAutoCommit: true + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + properties: + spring: + json: + trusted: + packages: '*' + +clickhouse: + r2dbc: + url: r2dbc:clickhouse://localhost:8123/default + diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/init.sql b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/init.sql new file mode 100644 index 000000000000..2b9e3aa17967 --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/main/resources/init.sql @@ -0,0 +1,27 @@ +CREATE TABLE IF NOT EXISTS stock_prices ( + symbol String, + original_price Float64, + currency String, + timestamp DateTime64(9, 'UTC') +) ENGINE = MergeTree() ORDER BY (symbol, timestamp); + +CREATE TABLE IF NOT EXISTS avg_stock_prices ( + date_time DateTime64(9, 'UTC'), + symbol String, + currency String, + avg_price AggregateFunction(avg, Float64) +) ENGINE = AggregatingMergeTree() +ORDER BY (symbol, currency, date_time); + +DROP TABLE IF EXISTS avg_stock_prices_mv; + +CREATE MATERIALIZED VIEW avg_stock_prices_mv TO avg_stock_prices AS +SELECT + toStartOfMinute(timestamp) AS date_time, + symbol, + currency, + avgState(original_price) AS avg_price +FROM stock_prices +GROUP BY symbol, currency, date_time; + + diff --git a/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/test/java/com/baeldung/reactive/kafka/stream/binder/StockLiveTest.java b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/test/java/com/baeldung/reactive/kafka/stream/binder/StockLiveTest.java new file mode 100644 index 000000000000..c9ca28442e96 --- /dev/null +++ b/spring-reactive-modules/spring-reactive-kafka-stream-binder/src/test/java/com/baeldung/reactive/kafka/stream/binder/StockLiveTest.java @@ -0,0 +1,136 @@ +package com.baeldung.reactive.kafka.stream.binder; + +import com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate; +import com.baeldung.reactive.kafka.stream.binder.domain.currency.CurrencyRate; +import com.baeldung.reactive.kafka.stream.binder.kafka.consumer.StockPriceConsumer; +import com.baeldung.reactive.kafka.stream.binder.kafka.producer.StockPriceProducer; +import com.baeldung.reactive.kafka.stream.binder.repository.ClickHouseRepository; +import org.awaitility.Awaitility; +import org.junit.Assert; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.testcontainers.clickhouse.ClickHouseContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.kafka.KafkaContainer; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Set; + +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@Testcontainers(parallel = true) +class StockLiveTest { + + @TestConfiguration + public static class Configuration { + + @Bean + @Primary + public CurrencyRate currencyRate() { + return (from, to, amount) -> Mono.just(amount * 1.2); + } + + } + + @Container + static ClickHouseContainer clickHouseContainer = new ClickHouseContainer("clickhouse/clickhouse-server:24.3-alpine"); + + @Container + static KafkaContainer kafkaContainer = new KafkaContainer("apache/kafka:3.9.0") + .withEnv("KAFKA_LISTENERS", "PLAINTEXT://:9092,BROKER://:9093,CONTROLLER://:9094") + .withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true"); + + @DynamicPropertySource + static void registerDynamicProperties(DynamicPropertyRegistry registry) { + registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers); + registry.add("spring.cloud.stream.kafka.binder.brokers", kafkaContainer::getBootstrapServers); + registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers); + registry.add("spring.kafka.consumer.bootstrap-servers", kafkaContainer::getBootstrapServers); + registry.add("clickhouse.r2dbc.url", + () -> "r2dbc:clickhouse://%s:%s/default".formatted(clickHouseContainer.getHost(), clickHouseContainer.getMappedPort(8123))); + } + + + @Autowired + private StockPriceProducer producer; + + @Autowired + private StockPriceConsumer consumer; + + @Autowired + private ClickHouseRepository clickHouseRepository; + + @Autowired + private WebTestClient webTestClient; + + @BeforeEach + void setup() { + clickHouseRepository.initDatabase().block(); + clickHouseRepository.runScript("DELETE FROM stock_prices WHERE symbol IS NOT NULL").block(); + consumer.resetCount(); + } + + @Test + void givenTheKafkaProducer_whenSubmittingEvents_thenProcessAllEventUntilItArrivesToConsumers() { + + var eventCount = 200; + + producer.produceStockPrices(eventCount).subscribe(); + + Awaitility + .waitAtMost(Duration.ofSeconds(60)) + .untilAsserted(() -> { + Assert.assertTrue(consumer.getCount() == eventCount); + }); + } + + @Test + void givenTheKafkaProducer_whenStreamConfigured_thenProcessAllEventsAndSaveThemInClickHouse() { + var eventCount = 200; + + var start = Instant.now().truncatedTo(ChronoUnit.MINUTES); + + producer.produceStockPrices(eventCount).subscribe(); + + Awaitility + .waitAtMost(Duration.ofSeconds(60)) + .untilAsserted(() -> { + Assert.assertTrue(consumer.getCount() == eventCount); + }); + + var end = Instant.now().plusSeconds(60).truncatedTo(ChronoUnit.MINUTES); + + List updates = webTestClient.get() + .uri("/stock-prices-out?from={from}&to={to}", start, end) + .exchange() + .expectStatus().isOk() + .expectBodyList(StockUpdate.class) + .returnResult() + .getResponseBody(); + + Assert.assertFalse(updates.isEmpty()); + + updates.forEach(update -> { + Assert.assertTrue(update.price() > 0); + Assert.assertTrue(Set.of(StockPriceProducer.STOCKS).contains(update.symbol())); + Assert.assertTrue(update.currency().equals("USD")); + Assert.assertTrue(isBetween(update.timestamp(), start, end)); + }); + } + + public boolean isBetween(Instant target, Instant start, Instant end) { + return !target.isBefore(start) && !target.isAfter(end); + } +}