-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate Asn1EncodedDataRouter to use Spring Kafka #138
base: dev
Are you sure you want to change the base?
Changes from all commits
35f386b
f7000c9
62b34b3
d04010d
b51b14a
23821e4
1cb9d97
f4147e4
eed0a82
c43c3e8
785795a
80295bc
1011688
6a85fbf
3dce2a4
3898ad9
225960c
1aba9e0
e7e7bf1
7aa4920
7e69562
ab2a633
b4473a3
c1f1178
79e0e0c
24fbba7
c99bbe8
cdb14a2
da1b379
1c0e260
5060560
d5e8f1b
b57d766
34a9dc1
692d3b2
f08c0fd
f05adff
1bc84ab
bd4b72e
d898850
e995cd1
9a5ade1
5acc7d8
ca18d0a
fb9d43a
e5d4b26
edce8c7
6ffa17c
9a652e2
b82e513
70ebddf
5722486
4f4737d
2a8b541
975c3f2
1bb81ec
0fc4bdf
b85bbcb
130dd7b
f07f493
aebe535
17eb75f
b46b790
ac8771e
6d43329
072b74f
087a4e8
5ec266d
01f7333
9c3a3a3
6e4ca6b
963a3de
b002fbb
b19e328
da4f857
c7136d1
6578b37
60b89b9
84079e5
586f701
f0f7030
ee88d53
531ee0c
f250b8c
c128e7e
a09fbb1
9c04c3d
853ce65
c7b8938
3e414a1
2c01cb3
5999898
82af676
735dfa7
1b1c39c
977a2b7
fcc2e8c
7c8e68c
0eee335
08550c0
bcb6901
dbe49ea
44b73cc
2246c2d
b920745
5c5ba73
411c562
6d60ed2
0d30254
69ac11e
1e7ffbc
7a58bce
fd715ab
bcfcfc6
e757746
1018461
a765be7
285c880
1c75ab6
dc48020
70827e7
52209b4
789acc9
c63b92d
c8055ad
bd9f820
5b86319
95f154f
e47f528
4a8f4f9
48f703f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package us.dot.its.jpo.ode.http; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; | ||
import org.springframework.web.client.RestTemplate; | ||
|
||
/** | ||
* This class provides a configuration for creating and managing | ||
* a {@link RestTemplate} bean, which is used for making HTTP requests | ||
* to external services. | ||
* | ||
* <p><strong>NOTE:</strong> As of 5.0 the {@link RestTemplate} class is in maintenance mode, with | ||
* only minor requests for changes and bugs to be accepted going forward. Please, | ||
* consider using the {@code org.springframework.web.reactive.client.WebClient} | ||
* which has a more modern API and supports sync, async, and streaming scenarios. | ||
* Whenever we the time or resources to update our Spring version, | ||
* we should replace usages of RestTemplate with WebClient.</p> | ||
*/ | ||
@Configuration | ||
public class WebClientConfig { | ||
|
||
/** | ||
* Creates and configures a {@link RestTemplate} bean with a custom | ||
* {@link MappingJackson2HttpMessageConverter} to use the provided | ||
* {@link ObjectMapper} for JSON serialization and deserialization. | ||
* | ||
* @param mapper the {@link ObjectMapper} to be used for configuring | ||
* JSON message conversion. | ||
* @return a configured {@link RestTemplate} instance that includes | ||
* the custom JSON message converter. | ||
*/ | ||
@Bean | ||
public RestTemplate restTemplate(ObjectMapper mapper) { | ||
var template = new RestTemplate(); | ||
MappingJackson2HttpMessageConverter customConverter = new MappingJackson2HttpMessageConverter(); | ||
customConverter.setObjectMapper(mapper); | ||
template.getMessageConverters().add(customConverter); | ||
return template; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
package us.dot.its.jpo.ode.kafka; | ||
|
||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
import us.dot.its.jpo.ode.OdeTimJsonTopology; | ||
|
||
/** | ||
* KafkaStreamsConfig is a Spring configuration class that provides | ||
* beans related to Kafka Streams topology setup. | ||
*/ | ||
@Configuration | ||
public class KafkaStreamsConfig { | ||
|
||
@Bean | ||
public OdeTimJsonTopology odeTimJsonTopology( | ||
@Value("${ode.kafka.topics.json.tim}") String timTopic, | ||
OdeKafkaProperties odeKafkaProperties) { | ||
return new OdeTimJsonTopology(odeKafkaProperties, timTopic); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,7 @@ | ||
package us.dot.its.jpo.ode.kafka.listeners.asn1; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import joptsimple.internal.Strings; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.kafka.clients.consumer.ConsumerRecord; | ||
import org.json.JSONObject; | ||
|
@@ -30,8 +32,8 @@ | |
* processing and forwarding it to different topics based on specific criteria. | ||
* | ||
* <p>This listener is specifically designed to handle decoded data produced by the asn1_codec. | ||
* Upon receiving a payload, it transforms the payload and then determines the appropriate | ||
* Kafka topic to forward the processed data.</p> | ||
* Upon receiving a payload, it transforms the payload and then determines the appropriate Kafka | ||
* topic to forward the processed data.</p> | ||
* | ||
* <p>The class utilizes Spring Kafka's annotation-driven listener configuration, | ||
* allowing it to automatically consume messages from a configured Kafka topic.</p> | ||
|
@@ -68,7 +70,8 @@ public Asn1DecodedDataRouter(KafkaTemplate<String, String> kafkaTemplate, | |
id = "Asn1DecodedDataRouter", | ||
topics = "${ode.kafka.topics.asn1.decoder-output}" | ||
) | ||
public void listen(ConsumerRecord<String, String> consumerRecord) throws XmlUtilsException { | ||
public void listen(ConsumerRecord<String, String> consumerRecord) | ||
throws XmlUtilsException, JsonProcessingException { | ||
log.debug("Key: {} payload: {}", consumerRecord.key(), consumerRecord.value()); | ||
|
||
JSONObject consumed = XmlUtils.toJSONObject(consumerRecord.value()) | ||
|
@@ -80,16 +83,23 @@ public void listen(ConsumerRecord<String, String> consumerRecord) throws XmlUtil | |
.getInt("messageId") | ||
); | ||
|
||
var metadataJson = XmlUtils.toJSONObject(consumerRecord.value()) | ||
.getJSONObject(OdeAsn1Data.class.getSimpleName()) | ||
.getJSONObject(AppContext.METADATA_STRING); | ||
OdeLogMetadata.RecordType recordType = OdeLogMetadata.RecordType | ||
.valueOf(XmlUtils.toJSONObject(consumerRecord.value()) | ||
.getJSONObject(OdeAsn1Data.class.getSimpleName()) | ||
.getJSONObject(AppContext.METADATA_STRING) | ||
.getString("recordType") | ||
); | ||
.valueOf(metadataJson.getString("recordType")); | ||
|
||
String streamId; | ||
if (Strings.isNullOrEmpty(consumerRecord.key()) | ||
|| "null".equalsIgnoreCase(consumerRecord.key())) { | ||
streamId = metadataJson.getJSONObject("serialId").getString("streamId"); | ||
} else { | ||
streamId = consumerRecord.key(); | ||
} | ||
|
||
switch (messageId) { | ||
case BasicSafetyMessage -> routeBSM(consumerRecord, recordType); | ||
case TravelerInformation -> routeTIM(consumerRecord, recordType); | ||
case TravelerInformation -> routeTIM(consumerRecord, streamId, recordType); | ||
case SPATMessage -> routeSPAT(consumerRecord, recordType); | ||
case MAPMessage -> routeMAP(consumerRecord, recordType); | ||
case SSMMessage -> routeSSM(consumerRecord, recordType); | ||
|
@@ -156,17 +166,18 @@ private void routeMAP(ConsumerRecord<String, String> consumerRecord, RecordType | |
kafkaTemplate.send(jsonTopics.getMap(), odeMapData); | ||
} | ||
|
||
private void routeTIM(ConsumerRecord<String, String> consumerRecord, RecordType recordType) | ||
throws XmlUtilsException { | ||
private void routeTIM(ConsumerRecord<String, String> consumerRecord, | ||
String streamId, | ||
RecordType type) throws XmlUtilsException { | ||
String odeTimData = | ||
OdeTimDataCreatorHelper.createOdeTimDataFromDecoded(consumerRecord.value()).toString(); | ||
switch (recordType) { | ||
switch (type) { | ||
case dnMsg -> kafkaTemplate.send(jsonTopics.getDnMessage(), consumerRecord.key(), odeTimData); | ||
case rxMsg -> kafkaTemplate.send(jsonTopics.getRxTim(), consumerRecord.key(), odeTimData); | ||
default -> log.trace("Consumed TIM data with record type: {}", recordType); | ||
default -> log.trace("Consumed TIM data with record type: {}", type); | ||
} | ||
// Send all TIMs also to OdeTimJson | ||
kafkaTemplate.send(jsonTopics.getTim(), consumerRecord.key(), odeTimData); | ||
kafkaTemplate.send(jsonTopics.getTim(), streamId, odeTimData); | ||
Comment on lines
-169
to
+180
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: the key is empty when consuming from the asn1 decoder output topic. We need to include the streamId as the key when publishing TIM data so that the downstream consumers can use the streamId to lookup the TIM JSON from the K-Table in OdeTimJsonTopology. If we don't produce with a streamId (which we conditionally set a few lines above this method)) then we will never produce to the TMCFiltered topic in the Asn1EncodedDataRouter |
||
} | ||
|
||
private void routeBSM(ConsumerRecord<String, String> consumerRecord, RecordType recordType) | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this method is only used in tests for now, and in the tests, a state of
REBALANCING
would cause an error. Addressing this possible bug may be worth the effort in a follow-up PR or separate work item. Still, I don't believe the changes belong here since I didn't change how the OdeTimJsonTopology works in this PR.