Skip to content

Commit

Permalink
Create BaseSeekToEndListener
Browse files Browse the repository at this point in the history
  • Loading branch information
iyourshaw committed Dec 18, 2024
1 parent ce9fd7b commit 1a4ee46
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public class ConflictMonitorApiProperties {
private static final Logger logger = LoggerFactory.getLogger(ConflictMonitorApiProperties.class);

private boolean confluentCloudEnabled = false;
private String confluentKey = null;
private String confluentSecret = null;
@Getter private String confluentKey = null;
@Getter private String confluentSecret = null;

private String version;
public static final int OUTPUT_SCHEMA_VERSION = 6;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package us.dot.its.jpo.ode.api.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import us.dot.its.jpo.ode.api.controllers.StompController;

import java.util.Map;

/**
* Base class for Kafka listeners that seek to the last offset before consuming when starting up.
* See <a href="https://docs.spring.io/spring-kafka/reference/kafka/seek.html">
* Spring Kafka: Seeking to a specific offset</a>
*/
@Slf4j
public class BaseSeekToEndListener extends AbstractConsumerSeekAware {

protected final StompController stompController;

public BaseSeekToEndListener(StompController stompController) {
this.stompController = stompController;
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Seek to end for TopicPartitions {}", assignments.keySet());
callback.seekToEnd(assignments.keySet());
super.onPartitionsAssigned(assignments, callback);
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package us.dot.its.jpo.ode.api.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey;
import us.dot.its.jpo.ode.api.controllers.StompController;
import us.dot.its.jpo.ode.model.OdeBsmData;

@Component
public class BsmSocketForwardListener extends AbstractConsumerSeekAware {

final StompController stompController;
@Slf4j
public class BsmSocketForwardListener extends BaseSeekToEndListener {

@Autowired
public BsmSocketForwardListener(StompController stompController) {
this.stompController = stompController;
super(stompController);
}

@KafkaListener(
Expand All @@ -28,6 +27,7 @@ public BsmSocketForwardListener(StompController stompController) {
autoStartup = "false")
public void listen(ConsumerRecord<BsmIntersectionIdKey, OdeBsmData> record) {
stompController.broadcastBSM(record.key(), record.value());
log.trace("Received bsm with offset {}", record.offset());
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package us.dot.its.jpo.ode.api.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -12,9 +15,9 @@
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;
import us.dot.its.jpo.geojsonconverter.serialization.deserializers.JsonDeserializer;
import us.dot.its.jpo.geojsonconverter.serialization.deserializers.ProcessedMapDeserializer;
import us.dot.its.jpo.ode.api.ConflictMonitorApiProperties;
import us.dot.its.jpo.ode.model.OdeBsmData;

import java.util.HashMap;
Expand All @@ -23,11 +26,15 @@
// Ref. https://github.com/eugenp/tutorials/blob/master/spring-kafka-3/src/main/java/com/baeldung/spring/kafka/startstopconsumer/KafkaConsumerConfig.java
@EnableKafka
@Configuration
@Slf4j
public class KafkaConsumerConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Autowired
ConflictMonitorApiProperties properties;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ProcessedSpat> spatListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProcessedSpat> factory =
Expand All @@ -54,37 +61,55 @@ public ConcurrentKafkaListenerContainerFactory<BsmIntersectionIdKey, OdeBsmData>

@Bean
public DefaultKafkaConsumerFactory<String, ProcessedSpat> spatConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new StringDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(ProcessedSpat.class));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(ProcessedSpat.class));
return consumerFactory(new StringDeserializer(), new JsonDeserializer<>(ProcessedSpat.class));
}

@Bean
public DefaultKafkaConsumerFactory<String, ProcessedMap<LineString>> mapConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new StringDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new ProcessedMapDeserializer<>(LineString.class));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props,
return consumerFactory(
new StringDeserializer(),
new ProcessedMapDeserializer<>(LineString.class));
}

@Bean
public DefaultKafkaConsumerFactory<BsmIntersectionIdKey, OdeBsmData> bsmConsumerFactory() {
return consumerFactory(
new JsonDeserializer<>(BsmIntersectionIdKey.class),
new JsonDeserializer<>(OdeBsmData.class));
}

private <TKey, TValue> DefaultKafkaConsumerFactory<TKey, TValue> consumerFactory(
Deserializer<TKey> keyDeserializer, Deserializer<TValue> valueDeserializer) {
Map<String, Object> props = commonProps();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
return new DefaultKafkaConsumerFactory<TKey, TValue>(props,
keyDeserializer,
valueDeserializer);
}

private Map<String, Object> commonProps() {
Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(BsmIntersectionIdKey.class));
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(OdeBsmData.class));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(BsmIntersectionIdKey.class),
new JsonDeserializer<>(OdeBsmData.class));
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);

if (properties.getConfluentCloudStatus()) {
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");

if (properties.getConfluentKey() != null && properties.getConfluentSecret() != null) {
String auth = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"" + properties.getConfluentKey() + "\" " +
"password=\"" + properties.getConfluentSecret() + "\";";
props.put("sasl.jaas.config", auth);
} else {
log.error(
"Environment variables CONFLUENT_KEY and CONFLUENT_SECRET are not set. Set these in the .env file to use Confluent Cloud");
}
}
return props;
}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package us.dot.its.jpo.ode.api.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.ode.api.controllers.StompController;

@Component
public class MapSocketForwardListener extends AbstractConsumerSeekAware {

final StompController stompController;
@Slf4j
public class MapSocketForwardListener extends BaseSeekToEndListener {

@Autowired
public MapSocketForwardListener(StompController stompController) {
this.stompController = stompController;
super(stompController);
}

@KafkaListener(id = ListenerIds.MAP,
Expand All @@ -27,5 +26,6 @@ public MapSocketForwardListener(StompController stompController) {
autoStartup = "false")
public void listen(ConsumerRecord<String, ProcessedMap<LineString>> record) {
stompController.broadcastMap(record.value());
log.trace("Received map with offset {}", record.offset());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,20 @@

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.ode.api.controllers.StompController;

import java.util.Map;

/**
* Kafka listener that can seek to the latest offset.
* See <a href="https://docs.spring.io/spring-kafka/reference/kafka/seek.html">
* Spring Kafka: Seeking to a specific offset</a>
*/
@Component
@Slf4j
public class SpatSocketForwardListener extends AbstractConsumerSeekAware {

final StompController stompController;
public class SpatSocketForwardListener extends BaseSeekToEndListener {

@Autowired
public SpatSocketForwardListener(StompController stompController) {
this.stompController = stompController;
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Seek to end for TopicPartitions {}", assignments.keySet());
callback.seekToEnd(assignments.keySet());
super.onPartitionsAssigned(assignments, callback);
super(stompController);
}

@KafkaListener(id = ListenerIds.SPAT,
Expand All @@ -43,7 +26,7 @@ public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consumer
autoStartup = "false")
public void listen(ConsumerRecord<String, ProcessedSpat> record) {
stompController.broadcastSpat(record.value());
log.debug("Received spat with offset {}", record.offset());
log.trace("Received spat with offset {}", record.offset());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

<logger name="us.dot.its.jpo.ode" level="INFO"/>
<logger name="us.dot.its.jpo.ode.api" level="DEBUG"/>
<logger name="us.dot.its.jpo.ode.api" level="TRACE"/>

<root level="ERROR">
<appender-ref ref="STDOUT" />
Expand Down

0 comments on commit 1a4ee46

Please sign in to comment.