Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate TimDepositController to use Spring Kafka #139

Draft
wants to merge 42 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
136a654
Replaced usages of MessageProducer with KafkaTemplate in TimDepositCo…
dmccoystephenson Dec 29, 2024
73cf99d
Used `serialIdJ2735.getStreamId()` as key when sending data to topics…
dmccoystephenson Dec 30, 2024
b0a19a9
Revert "Used `serialIdJ2735.getStreamId()` as key when sending data t…
dmccoystephenson Dec 31, 2024
a452a51
Revert "Replaced usages of MessageProducer with KafkaTemplate in TimD…
dmccoystephenson Dec 31, 2024
221ce67
Refactored TimDepositControllerTest to use Spring annotations
dmccoystephenson Jan 2, 2025
2d2e743
Removed unnecessary `@Capturing` variables from TimDepositControllerT…
dmccoystephenson Jan 2, 2025
a65f63f
Renamed fields in TimDepositControllerTest
dmccoystephenson Jan 2, 2025
3e58ad7
Updated TimDepositControllerTest.messageWithNoRSUsOrSDWShouldReturnWa…
dmccoystephenson Jan 2, 2025
859fc18
Formatted TimDepositControllerTest
dmccoystephenson Jan 2, 2025
6d8b405
Updated TimDepositControllerTest.failedObjectNodeConversionShouldRetu…
dmccoystephenson Jan 2, 2025
d62fe37
Formatted TimDepositControllerTest
dmccoystephenson Jan 2, 2025
74acfc7
Added some TODOs
dmccoystephenson Jan 2, 2025
d97d5c9
Updated TimDepositControllerTest.failedXmlConversionShouldReturnConve…
dmccoystephenson Jan 2, 2025
6da510e
Cleaned up consumer initialization & added consumer cleanup in TimDep…
dmccoystephenson Jan 3, 2025
d1ea7f3
Formatted TimDepositControllerTest
dmccoystephenson Jan 3, 2025
36777bd
Updated TimDepositControllerTest to set unique topic names for each test
dmccoystephenson Jan 3, 2025
325132a
Formatted TimDepositControllerTest
dmccoystephenson Jan 3, 2025
e26aaf2
Started working on updating TimDepositControllerTest.testSuccessfulMe…
dmccoystephenson Jan 3, 2025
c55e95e
Finished updating TimDepositControllerTest.testSuccessfulMessageRetur…
dmccoystephenson Jan 3, 2025
e0d565e
Updated TimDepositControllerTest.testSuccessfulSdwRequestMessageRetur…
dmccoystephenson Jan 3, 2025
c28d736
Updated TimDepositControllerTest.testSuccessfulMessageReturnsSuccessM…
dmccoystephenson Jan 3, 2025
fabc8b7
Updated TimDepositControllerTest.testSuccessfulMessageReturnsSuccessM…
dmccoystephenson Jan 3, 2025
bdfcbbf
Updated TimDepositControllerTest.testDepositingTimWithExtraProperties…
dmccoystephenson Jan 3, 2025
cd75428
Updated TimDepositControllerTest.testSuccessfulTimIngestIsTracked to …
dmccoystephenson Jan 3, 2025
97f6fa8
Updated TimDepositControllerTest.testSuccessfulRsuMessageReturnsSucce…
dmccoystephenson Jan 3, 2025
0338908
Updated TimDepositControllerTest.messageWithNoRSUsOrSDWShouldReturnWa…
dmccoystephenson Jan 3, 2025
c1fa2f9
Updated TimDepositControllerTest.failedObjectNodeConversionShouldRetu…
dmccoystephenson Jan 3, 2025
6cefeaa
Updated TimDepositControllerTest.testSuccessfulMessageReturnsSuccessM…
dmccoystephenson Jan 3, 2025
3c13b71
Formatted TimDepositControllerTest
dmccoystephenson Jan 3, 2025
dd12c24
Updated TimDepositControllerTest.testSuccessfulSdwRequestMessageRetur…
dmccoystephenson Jan 3, 2025
828285c
Updated TimDepositControllerTest.testSuccessfulMessageReturnsSuccessM…
dmccoystephenson Jan 3, 2025
5b30ce6
Updated TimDepositControllerTest.testSuccessfulMessageReturnsSuccessM…
dmccoystephenson Jan 3, 2025
8e3f735
Updated TimDepositControllerTest.testDepositingTimWithExtraProperties…
dmccoystephenson Jan 3, 2025
d05eb01
Updated TimDepositControllerTest.testSuccessfulTimIngestIsTracked to …
dmccoystephenson Jan 3, 2025
ae3e8d9
Updated TimDepositControllerTest.testSuccessfulRsuMessageReturnsSucce…
dmccoystephenson Jan 3, 2025
fe64b58
Updated TimDepositControllerTest.failedXmlConversionShouldReturnConve…
dmccoystephenson Jan 3, 2025
9e7ab14
Formatted TimDepositControllerTest
dmccoystephenson Jan 3, 2025
bd0059c
Replaced usages of MessageProducer with KafkaTemplate in TimDepositCo…
dmccoystephenson Jan 3, 2025
35afd66
Used `serialIdJ2735.getStreamId()` as key when sending data to topics…
dmccoystephenson Jan 3, 2025
985cb41
Updated TimDepositController to support usage of KafkaTemplate instea…
dmccoystephenson Jan 3, 2025
adc036d
Formatted TimDepositControllerTest
dmccoystephenson Jan 3, 2025
202cd2c
Merge branch 'dev' into dmccoystephenson/spring-kafka/migrate-tim-dep…
dmccoystephenson Jan 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import us.dot.its.jpo.ode.coder.OdeTimDataCreatorHelper;
import us.dot.its.jpo.ode.kafka.OdeKafkaProperties;
import us.dot.its.jpo.ode.kafka.topics.Asn1CoderTopics;
import us.dot.its.jpo.ode.kafka.topics.JsonTopics;
import us.dot.its.jpo.ode.kafka.topics.PojoTopics;
Expand All @@ -57,8 +57,6 @@
import us.dot.its.jpo.ode.util.JsonUtils;
import us.dot.its.jpo.ode.util.JsonUtils.JsonUtilsException;
import us.dot.its.jpo.ode.util.XmlUtils;
import us.dot.its.jpo.ode.wrapper.MessageProducer;
import us.dot.its.jpo.ode.wrapper.serdes.OdeTimSerializer;

/**
* The REST controller for handling TIM creation requests.
Expand All @@ -80,8 +78,8 @@ public class TimDepositController {
private final SerialId serialIdJ2735;
private final SerialId serialIdOde;

private final MessageProducer<String, String> stringMsgProducer;
private final MessageProducer<String, OdeObject> timProducer;
private final KafkaTemplate<String, String> kafkaTemplate;
private final KafkaTemplate<String, OdeObject> timDataKafkaTemplate;

private final boolean dataSigningEnabledSDW;

Expand All @@ -102,12 +100,13 @@ public TimDepositControllerException(String errMsg) {
* Spring Autowired constructor for the REST controller to properly initialize.
*/
@Autowired
public TimDepositController(OdeKafkaProperties odeKafkaProperties,
Asn1CoderTopics asn1CoderTopics,
public TimDepositController(Asn1CoderTopics asn1CoderTopics,
PojoTopics pojoTopics,
JsonTopics jsonTopics,
TimIngestTrackerProperties ingestTrackerProperties,
SecurityServicesProperties securityServicesProperties) {
SecurityServicesProperties securityServicesProperties,
KafkaTemplate<String, String> kafkaTemplate,
KafkaTemplate<String, OdeObject> timDataKafkaTemplate) {
super();

this.asn1CoderTopics = asn1CoderTopics;
Expand All @@ -116,12 +115,8 @@ public TimDepositController(OdeKafkaProperties odeKafkaProperties,
this.serialIdJ2735 = new SerialId();
this.serialIdOde = new SerialId();

this.stringMsgProducer =
MessageProducer.defaultStringMessageProducer(odeKafkaProperties.getBrokers(),
odeKafkaProperties.getKafkaType(), odeKafkaProperties.getDisabledTopics());
this.timProducer = new MessageProducer<>(odeKafkaProperties.getBrokers(),
odeKafkaProperties.getKafkaType(), null,
OdeTimSerializer.class.getName(), odeKafkaProperties.getDisabledTopics());
this.kafkaTemplate = kafkaTemplate;
this.timDataKafkaTemplate = timDataKafkaTemplate;

this.dataSigningEnabledSDW = securityServicesProperties.getIsSdwSigningEnabled();

Expand Down Expand Up @@ -247,10 +242,10 @@ public synchronized ResponseEntity<String> depositTim(String jsonString, Request
}

OdeTimData odeTimData = new OdeTimData(timMetadata, timDataPayload);
timProducer.send(pojoTopics.getTimBroadcast(), null, odeTimData);
timDataKafkaTemplate.send(pojoTopics.getTimBroadcast(), null, odeTimData);

String obfuscatedTimData = TimTransmogrifier.obfuscateRsuPassword(odeTimData.toJson());
stringMsgProducer.send(jsonTopics.getTimBroadcast(), null, obfuscatedTimData);
kafkaTemplate.send(jsonTopics.getTimBroadcast(), serialIdJ2735.getStreamId(), obfuscatedTimData);

// Now that the message has been published to OdeBroadcastTim topic, it should
// be
Expand Down Expand Up @@ -311,13 +306,13 @@ public synchronized ResponseEntity<String> depositTim(String jsonString, Request

String obfuscatedJ2735Tim = TimTransmogrifier.obfuscateRsuPassword(j2735Tim);
// publish Broadcast TIM to a J2735 compliant topic.
stringMsgProducer.send(jsonTopics.getJ2735TimBroadcast(), null, obfuscatedJ2735Tim);
kafkaTemplate.send(jsonTopics.getJ2735TimBroadcast(), serialIdJ2735.getStreamId(), obfuscatedJ2735Tim);
// publish J2735 TIM also to general un-filtered TIM topic with streamID as key
stringMsgProducer.send(jsonTopics.getTim(), serialIdJ2735.getStreamId(),
obfuscatedJ2735Tim); // Write XML to the encoder input topic at the end to ensure the correct order
kafkaTemplate.send(jsonTopics.getTim(), serialIdJ2735.getStreamId(), obfuscatedJ2735Tim);
// Write XML to the encoder input topic at the end to ensure the correct order
// of operations to pair
// each message to an OdeTimJson streamId key
stringMsgProducer.send(asn1CoderTopics.getEncoderInput(), null, xmlMsg);
kafkaTemplate.send(asn1CoderTopics.getEncoderInput(), serialIdJ2735.getStreamId(), xmlMsg);
}

serialIdOde.increment();
Expand Down
Loading
Loading