Skip to content

Commit

Permalink
Working seek-to-end for spats
Browse files Browse the repository at this point in the history
  • Loading branch information
iyourshaw committed Dec 18, 2024
1 parent ed59057 commit ce9fd7b
Show file tree
Hide file tree
Showing 12 changed files with 464 additions and 405 deletions.
Original file line number Diff line number Diff line change
@@ -1,78 +1,78 @@
package us.dot.its.jpo.ode.api;

import java.util.List;


import com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Profile;

import org.springframework.stereotype.Controller;

import us.dot.its.jpo.ode.api.topologies.*;

import us.dot.its.jpo.ode.api.controllers.StompController;
import lombok.Getter;

/**
* Launches ToGeoJsonFromJsonConverter service
*/
@Controller
@DependsOn("createKafkaTopics")
@Profile("!test")
public class APIServiceController {

private static final Logger logger = LoggerFactory.getLogger(APIServiceController.class);
org.apache.kafka.common.serialization.Serdes bas;

// Collection of all the topologies
@Getter
final List<RestartableTopology> topologies;

@Autowired
public APIServiceController(
ConflictMonitorApiProperties props,
StompController stompController) {

ImmutableList.Builder<RestartableTopology> topologyListBuilder = ImmutableList.builder();

try {

logger.info("Starting {}", this.getClass().getSimpleName());


SpatSocketForwardTopology spatSocketForwardTopology = new SpatSocketForwardTopology(
"topic.ProcessedSpat",
stompController,
props.createStreamProperties("processedSpat")
);
topologyListBuilder.add(spatSocketForwardTopology);

MapSocketForwardTopology mapSocketForwardTopology = new MapSocketForwardTopology(
"topic.ProcessedMap",
stompController,
props.createStreamProperties("processedMap")
);
topologyListBuilder.add(mapSocketForwardTopology);

BsmSocketForwardTopology bsmSocketForwardTopology = new BsmSocketForwardTopology(
"topic.CmBsmIntersection",
stompController,
props.createStreamProperties("bsm")
);
topologyListBuilder.add(bsmSocketForwardTopology);

logger.info("All Services Constructed {}", this.getClass().getSimpleName());
} catch (Exception e) {
logger.error("Encountered issue with creating topologies", e);
}

topologies = topologyListBuilder.build();
}



}
//package us.dot.its.jpo.ode.api;
//
//import java.util.List;
//
//
//import com.google.common.collect.ImmutableList;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.context.annotation.DependsOn;
//import org.springframework.context.annotation.Profile;
//
//import org.springframework.stereotype.Controller;
//
//import us.dot.its.jpo.ode.api.topologies.*;
//
//import us.dot.its.jpo.ode.api.controllers.StompController;
//import lombok.Getter;
//
///**
// * Launches ToGeoJsonFromJsonConverter service
// */
//@Controller
//@DependsOn("createKafkaTopics")
//@Profile("!test")
//public class APIServiceController {
//
// private static final Logger logger = LoggerFactory.getLogger(APIServiceController.class);
// org.apache.kafka.common.serialization.Serdes bas;
//
// // Collection of all the topologies
// @Getter
// final List<RestartableTopology> topologies;
//
// @Autowired
// public APIServiceController(
// ConflictMonitorApiProperties props,
// StompController stompController) {
//
// ImmutableList.Builder<RestartableTopology> topologyListBuilder = ImmutableList.builder();
//
// try {
//
// logger.info("Starting {}", this.getClass().getSimpleName());
//
//
// SpatSocketForwardTopology spatSocketForwardTopology = new SpatSocketForwardTopology(
// "topic.ProcessedSpat",
// stompController,
// props.createStreamProperties("processedSpat")
// );
// topologyListBuilder.add(spatSocketForwardTopology);
//
// MapSocketForwardTopology mapSocketForwardTopology = new MapSocketForwardTopology(
// "topic.ProcessedMap",
// stompController,
// props.createStreamProperties("processedMap")
// );
// topologyListBuilder.add(mapSocketForwardTopology);
//
// BsmSocketForwardTopology bsmSocketForwardTopology = new BsmSocketForwardTopology(
// "topic.CmBsmIntersection",
// stompController,
// props.createStreamProperties("bsm")
// );
// topologyListBuilder.add(bsmSocketForwardTopology);
//
// logger.info("All Services Constructed {}", this.getClass().getSimpleName());
// } catch (Exception e) {
// logger.error("Encountered issue with creating topologies", e);
// }
//
// topologies = topologyListBuilder.build();
// }
//
//
//
//}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,10 @@
import org.springframework.web.socket.messaging.AbstractSubProtocolEvent;
import org.springframework.web.socket.messaging.SessionConnectEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import us.dot.its.jpo.ode.api.APIServiceController;
import us.dot.its.jpo.ode.api.kafka.KafkaListenerControlService;
import us.dot.its.jpo.ode.api.topologies.RestartableTopology;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,14 @@
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString;
import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes;
import us.dot.its.jpo.geojsonconverter.serialization.deserializers.JsonDeserializer;
import us.dot.its.jpo.geojsonconverter.serialization.deserializers.ProcessedMapDeserializer;
import us.dot.its.jpo.ode.model.OdeBsmData;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -30,6 +36,22 @@ public ConcurrentKafkaListenerContainerFactory<String, ProcessedSpat> spatListen
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ProcessedMap<LineString>> mapListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ProcessedMap<LineString>> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(mapConsumerFactory());
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<BsmIntersectionIdKey, OdeBsmData> bsmListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<BsmIntersectionIdKey, OdeBsmData> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(bsmConsumerFactory());
return factory;
}

@Bean
public DefaultKafkaConsumerFactory<String, ProcessedSpat> spatConsumerFactory() {
Map<String, Object> props = new HashMap<>();
Expand All @@ -41,4 +63,28 @@ public DefaultKafkaConsumerFactory<String, ProcessedSpat> spatConsumerFactory()
new StringDeserializer(),
new JsonDeserializer<>(ProcessedSpat.class));
}

@Bean
public DefaultKafkaConsumerFactory<String, ProcessedMap<LineString>> mapConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new StringDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new ProcessedMapDeserializer<>(LineString.class));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new ProcessedMapDeserializer<>(LineString.class));
}

@Bean
public DefaultKafkaConsumerFactory<BsmIntersectionIdKey, OdeBsmData> bsmConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(BsmIntersectionIdKey.class));
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(OdeBsmData.class));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(BsmIntersectionIdKey.class),
new JsonDeserializer<>(OdeBsmData.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public void startListeners() {
startListener(ListenerIds.MAP);
mapListener.seekToEnd();
startListener(ListenerIds.SPAT);
spatListener.seekToEnd();

//spatListener.seekToEnd();
startListener(ListenerIds.BSM);
bsmListener.seekToEnd();
}
Expand All @@ -49,13 +50,15 @@ public void stopListeners() {
private void startListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && !listenerContainer.isRunning()) {
log.info("Starting kafka listener: {}", listenerId);
listenerContainer.start();
}
}

private void stopListener(String listenerId) {
MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId);
if (listenerContainer != null && listenerContainer.isRunning()) {
log.info("Stopping kafka listener: {}", listenerId);
listenerContainer.stop();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package us.dot.its.jpo.ode.api.kafka;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;
import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat;
import us.dot.its.jpo.ode.api.controllers.StompController;

import java.util.Map;

/**
* Kafka listener that can seek to the latest offset.
* See <a href="https://docs.spring.io/spring-kafka/reference/kafka/seek.html">
* Spring Kafka: Seeking to a specific offset</a>
*/
@Component
@Slf4j
public class SpatSocketForwardListener extends AbstractConsumerSeekAware {

final StompController stompController;
Expand All @@ -23,6 +28,13 @@ public SpatSocketForwardListener(StompController stompController) {
this.stompController = stompController;
}

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Seek to end for TopicPartitions {}", assignments.keySet());
callback.seekToEnd(assignments.keySet());
super.onPartitionsAssigned(assignments, callback);
}

@KafkaListener(id = ListenerIds.SPAT,
groupId = ListenerIds.SPAT,
topics = "topic.ProcessedSpat",
Expand All @@ -31,6 +43,7 @@ public SpatSocketForwardListener(StompController stompController) {
autoStartup = "false")
public void listen(ConsumerRecord<String, ProcessedSpat> record) {
stompController.broadcastSpat(record.value());
log.debug("Received spat with offset {}", record.offset());
}

}
Loading

0 comments on commit ce9fd7b

Please sign in to comment.