From ce9fd7b6d058722b4194a8e329352a41dc422a4c Mon Sep 17 00:00:00 2001 From: Ivan Yourshaw <39739503+iyourshaw@users.noreply.github.com> Date: Tue, 17 Dec 2024 19:43:19 -0700 Subject: [PATCH] Working seek-to-end for spats --- .../its/jpo/ode/api/APIServiceController.java | 156 +++++++++--------- .../controllers/StompSessionController.java | 3 - .../ode/api/kafka/KafkaConsumerConfig.java | 46 ++++++ .../kafka/KafkaListenerControlService.java | 5 +- .../api/kafka/SpatSocketForwardListener.java | 13 ++ .../jpo/ode/api/topologies/BaseTopology.java | 128 +++++++------- .../topologies/BsmSocketForwardTopology.java | 100 +++++------ .../api/topologies/DataLoaderTopology.java | 100 +++++------ .../jpo/ode/api/topologies/EmailTopology.java | 112 ++++++------- .../topologies/MapSocketForwardTopology.java | 96 +++++------ .../api/topologies/RestartableTopology.java | 20 +-- .../topologies/SpatSocketForwardTopology.java | 90 +++++----- 12 files changed, 464 insertions(+), 405 deletions(-) diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/APIServiceController.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/APIServiceController.java index c9db7c94..35cc4498 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/APIServiceController.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/APIServiceController.java @@ -1,78 +1,78 @@ -package us.dot.its.jpo.ode.api; - -import java.util.List; - - -import com.google.common.collect.ImmutableList; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.DependsOn; -import org.springframework.context.annotation.Profile; - -import org.springframework.stereotype.Controller; - -import us.dot.its.jpo.ode.api.topologies.*; - -import us.dot.its.jpo.ode.api.controllers.StompController; -import lombok.Getter; - -/** - * Launches ToGeoJsonFromJsonConverter service - */ -@Controller -@DependsOn("createKafkaTopics") -@Profile("!test") -public class APIServiceController { - - private static final Logger logger = LoggerFactory.getLogger(APIServiceController.class); - org.apache.kafka.common.serialization.Serdes bas; - - // Collection of all the topologies - @Getter - final List topologies; - - @Autowired - public APIServiceController( - ConflictMonitorApiProperties props, - StompController stompController) { - - ImmutableList.Builder topologyListBuilder = ImmutableList.builder(); - - try { - - logger.info("Starting {}", this.getClass().getSimpleName()); - - - SpatSocketForwardTopology spatSocketForwardTopology = new SpatSocketForwardTopology( - "topic.ProcessedSpat", - stompController, - props.createStreamProperties("processedSpat") - ); - topologyListBuilder.add(spatSocketForwardTopology); - - MapSocketForwardTopology mapSocketForwardTopology = new MapSocketForwardTopology( - "topic.ProcessedMap", - stompController, - props.createStreamProperties("processedMap") - ); - topologyListBuilder.add(mapSocketForwardTopology); - - BsmSocketForwardTopology bsmSocketForwardTopology = new BsmSocketForwardTopology( - "topic.CmBsmIntersection", - stompController, - props.createStreamProperties("bsm") - ); - topologyListBuilder.add(bsmSocketForwardTopology); - - logger.info("All Services Constructed {}", this.getClass().getSimpleName()); - } catch (Exception e) { - logger.error("Encountered issue with creating topologies", e); - } - - topologies = topologyListBuilder.build(); - } - - - -} \ No newline at end of file +//package us.dot.its.jpo.ode.api; +// +//import java.util.List; +// +// +//import com.google.common.collect.ImmutableList; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.context.annotation.DependsOn; +//import org.springframework.context.annotation.Profile; +// +//import org.springframework.stereotype.Controller; +// +//import us.dot.its.jpo.ode.api.topologies.*; +// +//import us.dot.its.jpo.ode.api.controllers.StompController; +//import lombok.Getter; +// +///** +// * Launches ToGeoJsonFromJsonConverter service +// */ +//@Controller +//@DependsOn("createKafkaTopics") +//@Profile("!test") +//public class APIServiceController { +// +// private static final Logger logger = LoggerFactory.getLogger(APIServiceController.class); +// org.apache.kafka.common.serialization.Serdes bas; +// +// // Collection of all the topologies +// @Getter +// final List topologies; +// +// @Autowired +// public APIServiceController( +// ConflictMonitorApiProperties props, +// StompController stompController) { +// +// ImmutableList.Builder topologyListBuilder = ImmutableList.builder(); +// +// try { +// +// logger.info("Starting {}", this.getClass().getSimpleName()); +// +// +// SpatSocketForwardTopology spatSocketForwardTopology = new SpatSocketForwardTopology( +// "topic.ProcessedSpat", +// stompController, +// props.createStreamProperties("processedSpat") +// ); +// topologyListBuilder.add(spatSocketForwardTopology); +// +// MapSocketForwardTopology mapSocketForwardTopology = new MapSocketForwardTopology( +// "topic.ProcessedMap", +// stompController, +// props.createStreamProperties("processedMap") +// ); +// topologyListBuilder.add(mapSocketForwardTopology); +// +// BsmSocketForwardTopology bsmSocketForwardTopology = new BsmSocketForwardTopology( +// "topic.CmBsmIntersection", +// stompController, +// props.createStreamProperties("bsm") +// ); +// topologyListBuilder.add(bsmSocketForwardTopology); +// +// logger.info("All Services Constructed {}", this.getClass().getSimpleName()); +// } catch (Exception e) { +// logger.error("Encountered issue with creating topologies", e); +// } +// +// topologies = topologyListBuilder.build(); +// } +// +// +// +//} \ No newline at end of file diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/controllers/StompSessionController.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/controllers/StompSessionController.java index f238c832..7c8134c0 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/controllers/StompSessionController.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/controllers/StompSessionController.java @@ -8,13 +8,10 @@ import org.springframework.web.socket.messaging.AbstractSubProtocolEvent; import org.springframework.web.socket.messaging.SessionConnectEvent; import org.springframework.web.socket.messaging.SessionDisconnectEvent; -import us.dot.its.jpo.ode.api.APIServiceController; import us.dot.its.jpo.ode.api.kafka.KafkaListenerControlService; -import us.dot.its.jpo.ode.api.topologies.RestartableTopology; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaConsumerConfig.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaConsumerConfig.java index b3b83f66..ca176ca5 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaConsumerConfig.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaConsumerConfig.java @@ -8,8 +8,14 @@ import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey; +import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString; +import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap; import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat; +import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes; import us.dot.its.jpo.geojsonconverter.serialization.deserializers.JsonDeserializer; +import us.dot.its.jpo.geojsonconverter.serialization.deserializers.ProcessedMapDeserializer; +import us.dot.its.jpo.ode.model.OdeBsmData; import java.util.HashMap; import java.util.Map; @@ -30,6 +36,22 @@ public ConcurrentKafkaListenerContainerFactory spatListen return factory; } + @Bean + public ConcurrentKafkaListenerContainerFactory> mapListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory> factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(mapConsumerFactory()); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory bsmListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = + new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(bsmConsumerFactory()); + return factory; + } + @Bean public DefaultKafkaConsumerFactory spatConsumerFactory() { Map props = new HashMap<>(); @@ -41,4 +63,28 @@ public DefaultKafkaConsumerFactory spatConsumerFactory() new StringDeserializer(), new JsonDeserializer<>(ProcessedSpat.class)); } + + @Bean + public DefaultKafkaConsumerFactory> mapConsumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new StringDeserializer()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new ProcessedMapDeserializer<>(LineString.class)); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + return new DefaultKafkaConsumerFactory<>(props, + new StringDeserializer(), + new ProcessedMapDeserializer<>(LineString.class)); + } + + @Bean + public DefaultKafkaConsumerFactory bsmConsumerFactory() { + Map props = new HashMap<>(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(BsmIntersectionIdKey.class)); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new JsonDeserializer<>(OdeBsmData.class)); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + return new DefaultKafkaConsumerFactory<>(props, + new JsonDeserializer<>(BsmIntersectionIdKey.class), + new JsonDeserializer<>(OdeBsmData.class)); + } } diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaListenerControlService.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaListenerControlService.java index bdc5b625..a0fbdf56 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaListenerControlService.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/KafkaListenerControlService.java @@ -35,7 +35,8 @@ public void startListeners() { startListener(ListenerIds.MAP); mapListener.seekToEnd(); startListener(ListenerIds.SPAT); - spatListener.seekToEnd(); + + //spatListener.seekToEnd(); startListener(ListenerIds.BSM); bsmListener.seekToEnd(); } @@ -49,6 +50,7 @@ public void stopListeners() { private void startListener(String listenerId) { MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); if (listenerContainer != null && !listenerContainer.isRunning()) { + log.info("Starting kafka listener: {}", listenerId); listenerContainer.start(); } } @@ -56,6 +58,7 @@ private void startListener(String listenerId) { private void stopListener(String listenerId) { MessageListenerContainer listenerContainer = registry.getListenerContainer(listenerId); if (listenerContainer != null && listenerContainer.isRunning()) { + log.info("Stopping kafka listener: {}", listenerId); listenerContainer.stop(); } } diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/SpatSocketForwardListener.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/SpatSocketForwardListener.java index e597f52f..1ea78ad5 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/SpatSocketForwardListener.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/kafka/SpatSocketForwardListener.java @@ -1,6 +1,8 @@ package us.dot.its.jpo.ode.api.kafka; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.AbstractConsumerSeekAware; @@ -8,12 +10,15 @@ import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat; import us.dot.its.jpo.ode.api.controllers.StompController; +import java.util.Map; + /** * Kafka listener that can seek to the latest offset. * See * Spring Kafka: Seeking to a specific offset */ @Component +@Slf4j public class SpatSocketForwardListener extends AbstractConsumerSeekAware { final StompController stompController; @@ -23,6 +28,13 @@ public SpatSocketForwardListener(StompController stompController) { this.stompController = stompController; } + @Override + public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { + log.info("Seek to end for TopicPartitions {}", assignments.keySet()); + callback.seekToEnd(assignments.keySet()); + super.onPartitionsAssigned(assignments, callback); + } + @KafkaListener(id = ListenerIds.SPAT, groupId = ListenerIds.SPAT, topics = "topic.ProcessedSpat", @@ -31,6 +43,7 @@ public SpatSocketForwardListener(StompController stompController) { autoStartup = "false") public void listen(ConsumerRecord record) { stompController.broadcastSpat(record.value()); + log.debug("Received spat with offset {}", record.offset()); } } diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BaseTopology.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BaseTopology.java index 618ae2d7..5aa074e4 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BaseTopology.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BaseTopology.java @@ -1,64 +1,64 @@ -package us.dot.its.jpo.ode.api.topologies; - -import lombok.Getter; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; -import org.slf4j.Logger; - -import java.time.Duration; -import java.util.Properties; - -/** - * Default implementation of common functionality for topologies - */ -public abstract class BaseTopology implements RestartableTopology{ - - protected abstract Logger getLogger(); - protected abstract Topology buildTopology(); - - protected Topology topology; - protected KafkaStreams streams; - - @Getter - protected final String topicName; - - protected final Properties streamsProperties; - - public BaseTopology(String topicName, Properties streamsProperties) { - this.topicName = topicName; - this.streamsProperties = streamsProperties; - } - - @Override - public void start() { - if (streams != null && streams.state().isRunningOrRebalancing()) { - throw new IllegalStateException("Start called while streams is already running."); - } - topology = buildTopology(); - streams = new KafkaStreams(topology, streamsProperties); - if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler); - if (stateListener != null) streams.setStateListener(stateListener); - streams.start(); - } - - @Override - public void stop() { - getLogger().info("Stopping topology for {}", topicName); - if (streams != null) { - // Trigger streams to shut down without blocking - streams.close(Duration.ZERO); - } - } - - KafkaStreams.StateListener stateListener; - public void registerStateListener(KafkaStreams.StateListener stateListener) { - this.stateListener = stateListener; - } - - StreamsUncaughtExceptionHandler exceptionHandler; - public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) { - this.exceptionHandler = exceptionHandler; - } - -} +//package us.dot.its.jpo.ode.api.topologies; +// +//import lombok.Getter; +//import org.apache.kafka.streams.KafkaStreams; +//import org.apache.kafka.streams.Topology; +//import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler; +//import org.slf4j.Logger; +// +//import java.time.Duration; +//import java.util.Properties; +// +///** +// * Default implementation of common functionality for topologies +// */ +//public abstract class BaseTopology implements RestartableTopology{ +// +// protected abstract Logger getLogger(); +// protected abstract Topology buildTopology(); +// +// protected Topology topology; +// protected KafkaStreams streams; +// +// @Getter +// protected final String topicName; +// +// protected final Properties streamsProperties; +// +// public BaseTopology(String topicName, Properties streamsProperties) { +// this.topicName = topicName; +// this.streamsProperties = streamsProperties; +// } +// +// @Override +// public void start() { +// if (streams != null && streams.state().isRunningOrRebalancing()) { +// throw new IllegalStateException("Start called while streams is already running."); +// } +// topology = buildTopology(); +// streams = new KafkaStreams(topology, streamsProperties); +// if (exceptionHandler != null) streams.setUncaughtExceptionHandler(exceptionHandler); +// if (stateListener != null) streams.setStateListener(stateListener); +// streams.start(); +// } +// +// @Override +// public void stop() { +// getLogger().info("Stopping topology for {}", topicName); +// if (streams != null) { +// // Trigger streams to shut down without blocking +// streams.close(Duration.ZERO); +// } +// } +// +// KafkaStreams.StateListener stateListener; +// public void registerStateListener(KafkaStreams.StateListener stateListener) { +// this.stateListener = stateListener; +// } +// +// StreamsUncaughtExceptionHandler exceptionHandler; +// public void registerUncaughtExceptionHandler(StreamsUncaughtExceptionHandler exceptionHandler) { +// this.exceptionHandler = exceptionHandler; +// } +// +//} diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BsmSocketForwardTopology.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BsmSocketForwardTopology.java index 59f3b360..834cb0e9 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BsmSocketForwardTopology.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/BsmSocketForwardTopology.java @@ -1,50 +1,50 @@ -package us.dot.its.jpo.ode.api.topologies; - -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; - -import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey; -import us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes; -import us.dot.its.jpo.ode.api.controllers.StompController; -import us.dot.its.jpo.ode.model.OdeBsmData; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.Properties; - -public class BsmSocketForwardTopology extends BaseTopology { - - protected final StompController controller; - protected final ObjectMapper objectMapper; - - private static final Logger logger = LoggerFactory.getLogger(BsmSocketForwardTopology.class); - - public BsmSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties){ - super(topicName, streamsProperties); - this.controller = controller; - this.objectMapper = new ObjectMapper(); - } - - @Override - public Topology buildTopology() { - StreamsBuilder builder = new StreamsBuilder(); - - KStream inputStream = builder.stream(topicName, Consumed.with(JsonSerdes.BsmIntersectionIdKey(), JsonSerdes.OdeBsm())); - - inputStream.foreach((key, value) -> { - controller.broadcastBSM(key, value); - }); - - return builder.build(); - - } - - @Override - protected Logger getLogger() { - return logger; - } -} +//package us.dot.its.jpo.ode.api.topologies; +// +//import org.apache.kafka.streams.StreamsBuilder; +//import org.apache.kafka.streams.Topology; +//import org.apache.kafka.streams.kstream.Consumed; +//import org.apache.kafka.streams.kstream.KStream; +// +//import us.dot.its.jpo.conflictmonitor.monitor.models.bsm.BsmIntersectionIdKey; +//import us.dot.its.jpo.conflictmonitor.monitor.serialization.JsonSerdes; +//import us.dot.its.jpo.ode.api.controllers.StompController; +//import us.dot.its.jpo.ode.model.OdeBsmData; +// +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +//import com.fasterxml.jackson.databind.ObjectMapper; +//import java.util.Properties; +// +//public class BsmSocketForwardTopology extends BaseTopology { +// +// protected final StompController controller; +// protected final ObjectMapper objectMapper; +// +// private static final Logger logger = LoggerFactory.getLogger(BsmSocketForwardTopology.class); +// +// public BsmSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties){ +// super(topicName, streamsProperties); +// this.controller = controller; +// this.objectMapper = new ObjectMapper(); +// } +// +// @Override +// public Topology buildTopology() { +// StreamsBuilder builder = new StreamsBuilder(); +// +// KStream inputStream = builder.stream(topicName, Consumed.with(JsonSerdes.BsmIntersectionIdKey(), JsonSerdes.OdeBsm())); +// +// inputStream.foreach((key, value) -> { +// controller.broadcastBSM(key, value); +// }); +// +// return builder.build(); +// +// } +// +// @Override +// protected Logger getLogger() { +// return logger; +// } +//} diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/DataLoaderTopology.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/DataLoaderTopology.java index 78a6a6f9..0f585b58 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/DataLoaderTopology.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/DataLoaderTopology.java @@ -1,50 +1,50 @@ -package us.dot.its.jpo.ode.api.topologies; - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; - -import us.dot.its.jpo.ode.api.models.DataLoader; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import java.util.Properties; - -public class DataLoaderTopology extends BaseTopology { - - private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class); - - Serde consumerSerde; - DataLoader dataLoader; - - public DataLoaderTopology(String topicName, Serde consumerSerde, DataLoader dataLoader, Properties streamsProperties){ - super(topicName, streamsProperties); - this.consumerSerde = consumerSerde; - this.dataLoader = dataLoader; - } - - @Override - protected Logger getLogger() { - return logger; - } - - @Override - public Topology buildTopology() { - StreamsBuilder builder = new StreamsBuilder(); - - KStream inputStream = builder.stream(topicName, Consumed.with(Serdes.String(), consumerSerde)); - - inputStream.foreach((key, value) -> { - dataLoader.add(value); - }); - - return builder.build(); - - } - -} +//package us.dot.its.jpo.ode.api.topologies; +// +//import org.apache.kafka.common.serialization.Serde; +//import org.apache.kafka.common.serialization.Serdes; +//import org.apache.kafka.streams.StreamsBuilder; +//import org.apache.kafka.streams.Topology; +//import org.apache.kafka.streams.kstream.Consumed; +//import org.apache.kafka.streams.kstream.KStream; +// +//import us.dot.its.jpo.ode.api.models.DataLoader; +// +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +// +//import java.util.Properties; +// +//public class DataLoaderTopology extends BaseTopology { +// +// private static final Logger logger = LoggerFactory.getLogger(DataLoaderTopology.class); +// +// Serde consumerSerde; +// DataLoader dataLoader; +// +// public DataLoaderTopology(String topicName, Serde consumerSerde, DataLoader dataLoader, Properties streamsProperties){ +// super(topicName, streamsProperties); +// this.consumerSerde = consumerSerde; +// this.dataLoader = dataLoader; +// } +// +// @Override +// protected Logger getLogger() { +// return logger; +// } +// +// @Override +// public Topology buildTopology() { +// StreamsBuilder builder = new StreamsBuilder(); +// +// KStream inputStream = builder.stream(topicName, Consumed.with(Serdes.String(), consumerSerde)); +// +// inputStream.foreach((key, value) -> { +// dataLoader.add(value); +// }); +// +// return builder.build(); +// +// } +// +//} diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/EmailTopology.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/EmailTopology.java index bd25e471..8a711c13 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/EmailTopology.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/EmailTopology.java @@ -1,56 +1,56 @@ -package us.dot.its.jpo.ode.api.topologies; - - -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; - -import us.dot.its.jpo.ode.api.models.DataLoader; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import java.util.Properties; - - - -public class EmailTopology extends BaseTopology { - - private static final Logger logger = LoggerFactory.getLogger(EmailTopology.class); - - - - Serde consumerSerde; - DataLoader dataLoader; - - - public EmailTopology(String topicName, Serde consumerSerde, DataLoader dataLoader, Properties streamsProperties){ - super(topicName, streamsProperties); - this.consumerSerde = consumerSerde; - this.dataLoader = dataLoader; - } - - @Override - protected Logger getLogger() { - return logger; - } - - public Topology buildTopology() { - StreamsBuilder builder = new StreamsBuilder(); - - KStream inputStream = builder.stream(topicName, Consumed.with(Serdes.String(), consumerSerde)); - - inputStream.foreach((key, value) -> { - dataLoader.add(value); - }); - - return builder.build(); - - } - - -} +//package us.dot.its.jpo.ode.api.topologies; +// +// +//import org.apache.kafka.common.serialization.Serde; +//import org.apache.kafka.common.serialization.Serdes; +//import org.apache.kafka.streams.StreamsBuilder; +//import org.apache.kafka.streams.Topology; +//import org.apache.kafka.streams.kstream.Consumed; +//import org.apache.kafka.streams.kstream.KStream; +// +//import us.dot.its.jpo.ode.api.models.DataLoader; +// +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +// +//import java.util.Properties; +// +// +// +//public class EmailTopology extends BaseTopology { +// +// private static final Logger logger = LoggerFactory.getLogger(EmailTopology.class); +// +// +// +// Serde consumerSerde; +// DataLoader dataLoader; +// +// +// public EmailTopology(String topicName, Serde consumerSerde, DataLoader dataLoader, Properties streamsProperties){ +// super(topicName, streamsProperties); +// this.consumerSerde = consumerSerde; +// this.dataLoader = dataLoader; +// } +// +// @Override +// protected Logger getLogger() { +// return logger; +// } +// +// public Topology buildTopology() { +// StreamsBuilder builder = new StreamsBuilder(); +// +// KStream inputStream = builder.stream(topicName, Consumed.with(Serdes.String(), consumerSerde)); +// +// inputStream.foreach((key, value) -> { +// dataLoader.add(value); +// }); +// +// return builder.build(); +// +// } +// +// +//} diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/MapSocketForwardTopology.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/MapSocketForwardTopology.java index 230131df..46c30196 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/MapSocketForwardTopology.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/MapSocketForwardTopology.java @@ -1,48 +1,48 @@ -package us.dot.its.jpo.ode.api.topologies; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; - -import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString; -import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap; -import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes; -import us.dot.its.jpo.ode.api.controllers.StompController; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -import java.util.Properties; - -public class MapSocketForwardTopology extends BaseTopology { - - private static final Logger logger = LoggerFactory.getLogger(MapSocketForwardTopology.class); - - StompController controller; - - public MapSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties){ - super(topicName, streamsProperties); - this.controller = controller; - } - - @Override - protected Logger getLogger() { - return logger; - } - - public Topology buildTopology() { - StreamsBuilder builder = new StreamsBuilder(); - - KStream> inputStream - = builder.stream(topicName, Consumed.with(Serdes.String(), JsonSerdes.ProcessedMapGeoJson())); - - inputStream.foreach((key, value) -> { - controller.broadcastMap(value); - }); - - return builder.build(); - } - -} +//package us.dot.its.jpo.ode.api.topologies; +// +//import org.apache.kafka.common.serialization.Serdes; +//import org.apache.kafka.streams.StreamsBuilder; +//import org.apache.kafka.streams.Topology; +//import org.apache.kafka.streams.kstream.Consumed; +//import org.apache.kafka.streams.kstream.KStream; +// +//import us.dot.its.jpo.geojsonconverter.pojos.geojson.LineString; +//import us.dot.its.jpo.geojsonconverter.pojos.geojson.map.ProcessedMap; +//import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes; +//import us.dot.its.jpo.ode.api.controllers.StompController; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +// +// +//import java.util.Properties; +// +//public class MapSocketForwardTopology extends BaseTopology { +// +// private static final Logger logger = LoggerFactory.getLogger(MapSocketForwardTopology.class); +// +// StompController controller; +// +// public MapSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties){ +// super(topicName, streamsProperties); +// this.controller = controller; +// } +// +// @Override +// protected Logger getLogger() { +// return logger; +// } +// +// public Topology buildTopology() { +// StreamsBuilder builder = new StreamsBuilder(); +// +// KStream> inputStream +// = builder.stream(topicName, Consumed.with(Serdes.String(), JsonSerdes.ProcessedMapGeoJson())); +// +// inputStream.foreach((key, value) -> { +// controller.broadcastMap(value); +// }); +// +// return builder.build(); +// } +// +//} diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/RestartableTopology.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/RestartableTopology.java index 3cc281be..d2188b99 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/RestartableTopology.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/RestartableTopology.java @@ -1,10 +1,10 @@ -package us.dot.its.jpo.ode.api.topologies; - -/** - * Interface for a Kafka Streams topology that can be stopped and restarted - */ -public interface RestartableTopology { - void start(); - void stop(); - String getTopicName(); -} +//package us.dot.its.jpo.ode.api.topologies; +// +///** +// * Interface for a Kafka Streams topology that can be stopped and restarted +// */ +//public interface RestartableTopology { +// void start(); +// void stop(); +// String getTopicName(); +//} diff --git a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/SpatSocketForwardTopology.java b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/SpatSocketForwardTopology.java index 0f6bf8cf..10262fa4 100644 --- a/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/SpatSocketForwardTopology.java +++ b/api/jpo-conflictvisualizer-api/src/main/java/us/dot/its/jpo/ode/api/topologies/SpatSocketForwardTopology.java @@ -1,45 +1,45 @@ -package us.dot.its.jpo.ode.api.topologies; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; - -import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat; -import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes; -import us.dot.its.jpo.ode.api.controllers.StompController; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.Properties; - -public class SpatSocketForwardTopology extends BaseTopology { - - private static final Logger logger = LoggerFactory.getLogger(SpatSocketForwardTopology.class); - - StompController controller; - - public SpatSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties) { - super(topicName, streamsProperties); - this.controller = controller; - } - - @Override - protected Logger getLogger() { - return logger; - } - - public Topology buildTopology() { - StreamsBuilder builder = new StreamsBuilder(); - - KStream inputStream = builder.stream(topicName, Consumed.with(Serdes.String(), JsonSerdes.ProcessedSpat())); - - inputStream.foreach((key, value) -> { - controller.broadcastSpat(value); - }); - - return builder.build(); - - } - -} +//package us.dot.its.jpo.ode.api.topologies; +// +//import org.apache.kafka.common.serialization.Serdes; +//import org.apache.kafka.streams.StreamsBuilder; +//import org.apache.kafka.streams.Topology; +//import org.apache.kafka.streams.kstream.Consumed; +//import org.apache.kafka.streams.kstream.KStream; +// +//import us.dot.its.jpo.geojsonconverter.pojos.spat.ProcessedSpat; +//import us.dot.its.jpo.geojsonconverter.serialization.JsonSerdes; +//import us.dot.its.jpo.ode.api.controllers.StompController; +//import org.slf4j.Logger; +//import org.slf4j.LoggerFactory; +//import java.util.Properties; +// +//public class SpatSocketForwardTopology extends BaseTopology { +// +// private static final Logger logger = LoggerFactory.getLogger(SpatSocketForwardTopology.class); +// +// StompController controller; +// +// public SpatSocketForwardTopology(String topicName, StompController controller, Properties streamsProperties) { +// super(topicName, streamsProperties); +// this.controller = controller; +// } +// +// @Override +// protected Logger getLogger() { +// return logger; +// } +// +// public Topology buildTopology() { +// StreamsBuilder builder = new StreamsBuilder(); +// +// KStream inputStream = builder.stream(topicName, Consumed.with(Serdes.String(), JsonSerdes.ProcessedSpat())); +// +// inputStream.foreach((key, value) -> { +// controller.broadcastSpat(value); +// }); +// +// return builder.build(); +// +// } +// +//}