Skip to content

Commit

Permalink
Fix name
Browse files Browse the repository at this point in the history
  • Loading branch information
thiagohora committed Dec 16, 2024
1 parent e5b0852 commit 54a9ffc
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 17 deletions.
1 change: 1 addition & 0 deletions spring-reactive-modules/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<module>spring-reactive-exceptions</module>
<module>spring-reactor</module>
<module>spring-webflux-amqp</module>
<module>spring-reactive-kafka-stream-binder</module>

<!-- the following submodules are commented out as a workaround in order to use java 19+ and SB 3.2.x -->
<!-- <module>spring-reactive-performance</module>-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@
</build>

<properties>
<spring-cloud.version>2023.0.2</spring-cloud.version>
<spring-cloud.version>2023.0.4</spring-cloud.version>
<start-class>com.baeldung.webflux.kafka.stream.Application</start-class>
<java.version>17</java.version>
<maven.compiler.release>17</maven.compiler.release>
<maven.compiler.release>17</maven.compiler.release>
</properties>

</project>
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
package com.baeldung.reactive.kafka.stream.binder.domain;

import org.springframework.kafka.support.serializer.JsonSerde;

import java.time.Instant;

public record StockUpdate(String symbol, double price, String currency, Instant timestamp) {

// Required for Kafka serialization
public static class StockUpdateJsonSerde extends JsonSerde<StockUpdate> { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@Component
public class StockPriceProducer {

public static final String[] stocks = {"AAPL", "GOOG", "MSFT", "AMZN", "TSLA"};
public static final String[] STOCKS = {"AAPL", "GOOG", "MSFT", "AMZN", "TSLA"};
private static final String CURRENCY = "USD";

private final ReactiveKafkaProducerTemplate<String, StockUpdate> kafkaProducer;
Expand All @@ -38,7 +38,7 @@ public StockPriceProducer(@NonNull KafkaProperties properties, @Qualifier(TopicC
public Flux<SenderResult<Void>> produceStockPrices(int count) {
return Flux.range(0, count)
.map(i -> {
String stock = stocks[random.nextInt(stocks.length)];
String stock = STOCKS[random.nextInt(STOCKS.length)];
double price = 100 + (200 * random.nextDouble());
return MessageBuilder.withPayload(new StockUpdate(stock, price, CURRENCY, Instant.now()))
.setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ spring:
kafka:
binder:
brokers: ${KAFKA_BROKER}
function:
definition: processStockPrices
bindings:
default:
content-type: application/json
Expand All @@ -18,11 +16,7 @@ spring:
group: live-stock-consumers-y
producer:
useNativeEncoding: true
configuration:
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
value:
serializer: com.baeldung.reactive.kafka.stream.binder.domain.StockUpdate$StockUpdateJsonSerde

kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers(parallel = true)
class StockIntegrationTest {
class StockLiveTest {

@TestConfiguration
public static class Configuration {
Expand Down Expand Up @@ -124,7 +124,7 @@ void shouldGetAllPersistedEvents() {

updates.forEach(update -> {
Assert.assertTrue(update.price() > 0);
Assert.assertTrue(Set.of(StockPriceProducer.stocks).contains(update.symbol()));
Assert.assertTrue(Set.of(StockPriceProducer.STOCKS).contains(update.symbol()));
Assert.assertTrue(update.currency().equals("USD"));
Assert.assertTrue(isBetween(update.timestamp(), start, end));
});
Expand Down

0 comments on commit 54a9ffc

Please sign in to comment.