respEntity = template.postForEntity(signatureUri, entity, String.class);
-
- log.debug("Security services module response: {}", respEntity);
-
- return respEntity.getBody();
- }
-
- public String packageSignedTimIntoAsd(ServiceRequest request, String signedMsg) {
-
- SDW sdw = request.getSdw();
- SNMP snmp = request.getSnmp();
- DdsAdvisorySituationData asd;
-
- byte sendToRsu = request.getRsus() != null ? DdsAdvisorySituationData.RSU : DdsAdvisorySituationData.NONE;
- byte distroType = (byte) (DdsAdvisorySituationData.IP | sendToRsu);
-
- String outputXml = null;
- try {
- if (null != snmp) {
-
- asd = new DdsAdvisorySituationData()
- .setAsdmDetails(snmp.getDeliverystart(), snmp.getDeliverystop(), distroType, null)
- .setServiceRegion(GeoRegionBuilder.ddsGeoRegion(sdw.getServiceRegion())).setTimeToLive(sdw.getTtl())
- .setGroupID(sdw.getGroupID()).setRecordID(sdw.getRecordId());
- } else {
- asd = new DdsAdvisorySituationData()
- .setAsdmDetails(sdw.getDeliverystart(), sdw.getDeliverystop(), distroType, null)
- .setServiceRegion(GeoRegionBuilder.ddsGeoRegion(sdw.getServiceRegion())).setTimeToLive(sdw.getTtl())
- .setGroupID(sdw.getGroupID()).setRecordID(sdw.getRecordId());
- }
-
- OdeMsgPayload payload;
-
- ObjectNode dataBodyObj = JsonUtils.newNode();
- ObjectNode asdObj = JsonUtils.toObjectNode(asd.toJson());
- ObjectNode admDetailsObj = (ObjectNode) asdObj.findValue("asdmDetails");
- admDetailsObj.remove("advisoryMessage");
- admDetailsObj.put("advisoryMessage", signedMsg);
-
- dataBodyObj.set(ADVISORY_SITUATION_DATA_STRING, asdObj);
-
- payload = new OdeAsdPayload(asd);
-
- ObjectNode payloadObj = JsonUtils.toObjectNode(payload.toJson());
- payloadObj.set(AppContext.DATA_STRING, dataBodyObj);
-
- OdeMsgMetadata metadata = new OdeMsgMetadata(payload);
- ObjectNode metaObject = JsonUtils.toObjectNode(metadata.toJson());
-
- ObjectNode requestObj = JsonUtils.toObjectNode(JsonUtils.toJson(request, false));
-
- requestObj.remove("tim");
-
- metaObject.set("request", requestObj);
-
- ArrayNode encodings = buildEncodings();
- ObjectNode enc = XmlUtils.createEmbeddedJsonArrayForXmlConversion(AppContext.ENCODINGS_STRING, encodings);
- metaObject.set(AppContext.ENCODINGS_STRING, enc);
-
- ObjectNode message = JsonUtils.newNode();
- message.set(AppContext.METADATA_STRING, metaObject);
- message.set(AppContext.PAYLOAD_STRING, payloadObj);
-
- ObjectNode root = JsonUtils.newNode();
- root.set(AppContext.ODE_ASN1_DATA, message);
-
- outputXml = XmlUtils.toXmlStatic(root);
-
- // remove the surrounding
- outputXml = outputXml.replace("", "");
- outputXml = outputXml.replace("", "");
-
- } catch (ParseException | JsonUtilsException | XmlUtilsException e) {
- log.error("Parsing exception thrown while populating ASD structure: ", e);
- }
-
- log.debug("Fully crafted ASD to be encoded: {}", outputXml);
-
- return outputXml;
- }
-
- public static ArrayNode buildEncodings() throws JsonUtilsException {
- ArrayNode encodings = JsonUtils.newArrayNode();
- encodings.add(TimTransmogrifier.buildEncodingNode(ADVISORY_SITUATION_DATA_STRING, ADVISORY_SITUATION_DATA_STRING,
- EncodingRule.UPER));
- return encodings;
- }
-}
diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/Asn1EncodedDataRouter.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/Asn1EncodedDataRouter.java
index 297dcaade..53b9bcc7b 100644
--- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/Asn1EncodedDataRouter.java
+++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/Asn1EncodedDataRouter.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*=============================================================================
* Copyright 2018 572682
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
@@ -16,51 +16,67 @@
package us.dot.its.jpo.ode.services.asn1;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+import java.text.ParseException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
import lombok.extern.slf4j.Slf4j;
-import org.json.JSONArray;
-import org.json.JSONException;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.json.JSONObject;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
import us.dot.its.jpo.ode.OdeTimJsonTopology;
import us.dot.its.jpo.ode.context.AppContext;
-import us.dot.its.jpo.ode.eventlog.EventLogger;
-import us.dot.its.jpo.ode.kafka.OdeKafkaProperties;
import us.dot.its.jpo.ode.kafka.topics.Asn1CoderTopics;
import us.dot.its.jpo.ode.kafka.topics.JsonTopics;
-import us.dot.its.jpo.ode.kafka.topics.SDXDepositorTopics;
+import us.dot.its.jpo.ode.model.Asn1Encoding;
+import us.dot.its.jpo.ode.model.Asn1Encoding.EncodingRule;
+import us.dot.its.jpo.ode.model.OdeAsdPayload;
import us.dot.its.jpo.ode.model.OdeAsn1Data;
+import us.dot.its.jpo.ode.model.OdeMsgMetadata;
+import us.dot.its.jpo.ode.model.OdeMsgPayload;
+import us.dot.its.jpo.ode.plugin.SNMP;
import us.dot.its.jpo.ode.plugin.ServiceRequest;
-import us.dot.its.jpo.ode.rsu.RsuProperties;
+import us.dot.its.jpo.ode.plugin.SituationDataWarehouse.SDW;
+import us.dot.its.jpo.ode.plugin.j2735.DdsAdvisorySituationData;
+import us.dot.its.jpo.ode.plugin.j2735.builders.GeoRegionBuilder;
+import us.dot.its.jpo.ode.rsu.RsuDepositor;
+import us.dot.its.jpo.ode.security.SecurityServicesClient;
import us.dot.its.jpo.ode.security.SecurityServicesProperties;
-import us.dot.its.jpo.ode.services.asn1.Asn1CommandManager.Asn1CommandManagerException;
+import us.dot.its.jpo.ode.security.models.SignatureResultModel;
import us.dot.its.jpo.ode.traveler.TimTransmogrifier;
+import us.dot.its.jpo.ode.uper.SupportedMessageType;
import us.dot.its.jpo.ode.util.CodecUtils;
-import us.dot.its.jpo.ode.util.JsonUtils;
-import us.dot.its.jpo.ode.util.JsonUtils.JsonUtilsException;
import us.dot.its.jpo.ode.util.XmlUtils;
-import us.dot.its.jpo.ode.wrapper.AbstractSubscriberProcessor;
-import us.dot.its.jpo.ode.wrapper.MessageProducer;
+import us.dot.its.jpo.ode.util.XmlUtils.XmlUtilsException;
/**
- * The Asn1EncodedDataRouter is responsible for routing encoded TIM messages
- * that are consumed from the Kafka topic.Asn1EncoderOutput topic and decide
- * whether to route to the SDX or an RSU.
+ * The Asn1EncodedDataRouter is responsible for routing encoded TIM messages that are consumed from
+ * the Kafka topic.Asn1EncoderOutput topic and decide whether to route to the SDX or an RSU.
**/
+@Component
@Slf4j
-public class Asn1EncodedDataRouter extends AbstractSubscriberProcessor {
+public class Asn1EncodedDataRouter {
private static final String BYTES = "bytes";
private static final String MESSAGE_FRAME = "MessageFrame";
- private static final String ERROR_ON_SDX_DEPOSIT = "Error on SDX deposit.";
+ private static final String ADVISORY_SITUATION_DATA_STRING = "AdvisorySituationData";
+ private final KafkaTemplate kafkaTemplate;
+ private final XmlMapper xmlMapper;
/**
* Exception for Asn1EncodedDataRouter specific failures.
*/
public static class Asn1EncodedDataRouterException extends Exception {
- private static final long serialVersionUID = 1L;
-
public Asn1EncodedDataRouterException(String string) {
super(string);
}
@@ -68,435 +84,354 @@ public Asn1EncodedDataRouterException(String string) {
private final Asn1CoderTopics asn1CoderTopics;
private final JsonTopics jsonTopics;
+ private final String sdxDepositTopic;
+ private final SecurityServicesClient securityServicesClient;
+ private final ObjectMapper mapper;
- private final MessageProducer stringMsgProducer;
private final OdeTimJsonTopology odeTimJsonTopology;
- private final Asn1CommandManager asn1CommandManager;
+ private final RsuDepositor rsuDepositor;
private final boolean dataSigningEnabledSDW;
private final boolean dataSigningEnabledRSU;
/**
- * Instantiates the Asn1EncodedDataRouter to actively consume from Kafka and route the
- * the encoded TIM messages to the SDX and RSUs.
+ * Instantiates the Asn1EncodedDataRouter to actively consume from Kafka and route the encoded TIM
+ * messages to the SDX and RSUs.
*
- * @param odeKafkaProperties The Kafka properties used to consume and produce to Kafka
- * @param asn1CoderTopics The specified ASN1 Coder topics
- * @param jsonTopics The specified JSON topics to write to
- * @param sdxDepositorTopics The SDX depositor topics to write to
- * @param rsuProperties The RSU properties to use
+ * @param asn1CoderTopics The specified ASN1 Coder topics
+ * @param jsonTopics The specified JSON topics to write to
* @param securityServicesProperties The security services properties to use
+ * @param mapper The ObjectMapper used for serialization/deserialization
**/
- public Asn1EncodedDataRouter(OdeKafkaProperties odeKafkaProperties,
- Asn1CoderTopics asn1CoderTopics,
- JsonTopics jsonTopics,
- SDXDepositorTopics sdxDepositorTopics,
- RsuProperties rsuProperties,
- SecurityServicesProperties securityServicesProperties) {
+ public Asn1EncodedDataRouter(Asn1CoderTopics asn1CoderTopics,
+ JsonTopics jsonTopics,
+ SecurityServicesProperties securityServicesProperties,
+ OdeTimJsonTopology odeTimJsonTopology,
+ RsuDepositor rsuDepositor,
+ SecurityServicesClient securityServicesClient,
+ KafkaTemplate kafkaTemplate,
+ @Value("${ode.kafka.topics.sdx-depositor.input}") String sdxDepositTopic,
+ ObjectMapper mapper,
+ XmlMapper xmlMapper) {
super();
this.asn1CoderTopics = asn1CoderTopics;
this.jsonTopics = jsonTopics;
+ this.sdxDepositTopic = sdxDepositTopic;
+ this.securityServicesClient = securityServicesClient;
- this.stringMsgProducer = MessageProducer.defaultStringMessageProducer(
- odeKafkaProperties.getBrokers(),
- odeKafkaProperties.getKafkaType(),
- odeKafkaProperties.getDisabledTopics());
+ this.kafkaTemplate = kafkaTemplate;
- this.asn1CommandManager = new Asn1CommandManager(
- odeKafkaProperties,
- sdxDepositorTopics,
- rsuProperties,
- securityServicesProperties);
+ this.rsuDepositor = rsuDepositor;
this.dataSigningEnabledSDW = securityServicesProperties.getIsSdwSigningEnabled();
this.dataSigningEnabledRSU = securityServicesProperties.getIsRsuSigningEnabled();
- odeTimJsonTopology = new OdeTimJsonTopology(odeKafkaProperties, jsonTopics.getTim());
- }
-
- @Override
- public Object process(String consumedData) {
- try {
- log.debug("Consumed: {}", consumedData);
- JSONObject consumedObj = XmlUtils.toJSONObject(consumedData).getJSONObject(
- OdeAsn1Data.class.getSimpleName());
-
- /*
- * When receiving the 'rsus' in xml, since there is only one 'rsu' and
- * there is no construct for array in xml, the rsus does not translate
- * to an array of 1 element. The following workaround, resolves this
- * issue.
- */
- JSONObject metadata = consumedObj.getJSONObject(AppContext.METADATA_STRING);
-
- if (metadata.has(TimTransmogrifier.REQUEST_STRING)) {
- JSONObject request = metadata.getJSONObject(TimTransmogrifier.REQUEST_STRING);
- if (request.has(TimTransmogrifier.RSUS_STRING)) {
- Object rsus = request.get(TimTransmogrifier.RSUS_STRING);
- if (rsus instanceof JSONObject) {
- JSONObject rsusIn = (JSONObject) request.get(TimTransmogrifier.RSUS_STRING);
- if (rsusIn.has(TimTransmogrifier.RSUS_STRING)) {
- Object rsu = rsusIn.get(TimTransmogrifier.RSUS_STRING);
- JSONArray rsusOut = new JSONArray();
- if (rsu instanceof JSONArray) {
- log.debug("Multiple RSUs exist in the request: {}", request);
- JSONArray rsusInArray = (JSONArray) rsu;
- for (int i = 0; i < rsusInArray.length(); i++) {
- rsusOut.put(rsusInArray.get(i));
- }
- request.put(TimTransmogrifier.RSUS_STRING, rsusOut);
- } else if (rsu instanceof JSONObject) {
- log.debug("Single RSU exists in the request: {}", request);
- rsusOut.put(rsu);
- request.put(TimTransmogrifier.RSUS_STRING, rsusOut);
- } else {
- log.debug("No RSUs exist in the request: {}", request);
- request.remove(TimTransmogrifier.RSUS_STRING);
- }
- }
- }
- }
-
- // Convert JSON to POJO
- ServiceRequest servicerequest = getServicerequest(consumedObj);
-
- processEncodedTim(servicerequest, consumedObj);
- } else {
- throw new Asn1EncodedDataRouterException("Invalid or missing '"
- + TimTransmogrifier.REQUEST_STRING + "' object in the encoder response");
- }
- } catch (Exception e) {
- String msg = "Error in processing received message from ASN.1 Encoder module: "
- + consumedData;
- if (log.isDebugEnabled()) {
- // print error message and stack trace
- EventLogger.logger.error(msg, e);
- log.error(msg, e);
- } else {
- // print error message only
- EventLogger.logger.error(msg);
- log.error(msg);
- }
- }
- return null;
+ this.odeTimJsonTopology = odeTimJsonTopology;
+ this.mapper = mapper;
+ this.xmlMapper = xmlMapper;
}
/**
- * Gets the service request based on the consumed JSONObject.
+ * Listens for messages from the specified Kafka topic and processes them.
*
- * @param consumedObj The object to retrieve the service request for
- * @return The service request
+ * @param consumerRecord The Kafka consumer record containing the key and value of the consumed
+ * message.
*/
- public ServiceRequest getServicerequest(JSONObject consumedObj) {
- String sr = consumedObj.getJSONObject(AppContext.METADATA_STRING).getJSONObject(
- TimTransmogrifier.REQUEST_STRING).toString();
- log.debug("ServiceRequest: {}", sr);
+ @KafkaListener(id = "Asn1EncodedDataRouter", topics = "${ode.kafka.topics.asn1.encoder-output}")
+ public void listen(ConsumerRecord consumerRecord)
+ throws XmlUtilsException, JsonProcessingException, Asn1EncodedDataRouterException {
+ JSONObject consumedObj = XmlUtils.toJSONObject(consumerRecord.value())
+ .getJSONObject(OdeAsn1Data.class.getSimpleName());
+
+ JSONObject metadata = consumedObj.getJSONObject(AppContext.METADATA_STRING);
+
+ if (!metadata.has(TimTransmogrifier.REQUEST_STRING)) {
+ throw new Asn1EncodedDataRouterException(
+ String.format("Invalid or missing '%s' object in the encoder response. Unable to process record with key '%s'",
+ TimTransmogrifier.REQUEST_STRING,
+ consumerRecord.key())
+ );
+ }
- // Convert JSON to POJO
- ServiceRequest serviceRequest = null;
- try {
- serviceRequest = (ServiceRequest) JsonUtils.fromJson(sr, ServiceRequest.class);
+ JSONObject payloadData = consumedObj.getJSONObject(AppContext.PAYLOAD_STRING).getJSONObject(AppContext.DATA_STRING);
+ JSONObject metadataJson = consumedObj.getJSONObject(AppContext.METADATA_STRING);
+ ServiceRequest request = getServiceRequest(metadataJson);
+ log.debug("Mapped to object ServiceRequest: {}", request);
- } catch (Exception e) {
- String errMsg = "Malformed JSON.";
- EventLogger.logger.error(errMsg, e);
- log.error(errMsg, e);
+ if (!payloadData.has(ADVISORY_SITUATION_DATA_STRING)) {
+ processUnsignedMessage(request, metadataJson, payloadData);
+ } else if (dataSigningEnabledSDW && request.getSdw() != null) {
+ processSignedMessage(request, payloadData);
+ } else {
+ processEncodedTimUnsigned(request, metadataJson, payloadData);
}
+ }
- return serviceRequest;
+ private ServiceRequest getServiceRequest(JSONObject metadataJson) throws JsonProcessingException {
+ String serviceRequestJson = metadataJson.getJSONObject(TimTransmogrifier.REQUEST_STRING).toString();
+ log.debug("ServiceRequest: {}", serviceRequestJson);
+ return mapper.readValue(serviceRequestJson, ServiceRequest.class);
}
- /**
- * Process the signed encoded TIM message.
- *
- * @param request The service request
- * @param consumedObj The consumed JSON object
- */
- public void processEncodedTim(ServiceRequest request, JSONObject consumedObj) {
-
- JSONObject dataObj = consumedObj.getJSONObject(AppContext.PAYLOAD_STRING).getJSONObject(
- AppContext.DATA_STRING);
- JSONObject metadataObj = consumedObj.getJSONObject(AppContext.METADATA_STRING);
-
- // CASE 1: no SDW in metadata (SNMP deposit only)
- // - sign MF
- // - send to RSU
- // CASE 2: SDW in metadata but no ASD in body (send back for another
- // encoding)
- // - sign MF
- // - send to RSU
- // - craft ASD object
- // - publish back to encoder stream
- // CASE 3: If SDW in metadata and ASD in body (double encoding complete)
- // - send to SDX
-
- if (!dataObj.has(Asn1CommandManager.ADVISORY_SITUATION_DATA_STRING)) {
- log.debug("Unsigned message received");
- // We don't have ASD, therefore it must be just a MessageFrame that needs to be
- // signed
- // No support for unsecured MessageFrame only payload.
- // Cases 1 & 2
- // Sign and send to RSUs
-
- JSONObject mfObj = dataObj.getJSONObject(MESSAGE_FRAME);
-
- String hexEncodedTim = mfObj.getString(BYTES);
- log.debug("Encoded message - phase 1: {}", hexEncodedTim);
- // use Asnc1 library to decode the encoded tim returned from ASNC1; another
- // class two blockers: decode the tim and decode the message-sign
-
- // Case 1: SNMP-deposit
- if (dataSigningEnabledRSU && request.getRsus() != null) {
- hexEncodedTim = signTIMAndProduceToExpireTopic(hexEncodedTim, consumedObj);
- } else {
- // if header is present, strip it
- if (isHeaderPresent(hexEncodedTim)) {
- String header = hexEncodedTim.substring(0, hexEncodedTim.indexOf("001F") + 4);
- log.debug("Stripping header from unsigned message: {}", header);
- hexEncodedTim = stripHeader(hexEncodedTim);
- mfObj.remove(BYTES);
- mfObj.put(BYTES, hexEncodedTim);
- dataObj.remove(MESSAGE_FRAME);
- dataObj.put(MESSAGE_FRAME, mfObj);
- consumedObj.remove(AppContext.PAYLOAD_STRING);
- consumedObj.put(AppContext.PAYLOAD_STRING, dataObj);
- }
- }
+ // If SDW in metadata and ASD in body (double encoding complete) -> send to SDX
+ private void processSignedMessage(ServiceRequest request, JSONObject dataObj) {
- if (null != request.getSnmp() && null != request.getRsus() && null != hexEncodedTim) {
- log.info("Sending message to RSUs...");
- asn1CommandManager.sendToRsus(request, hexEncodedTim);
- }
+ JSONObject asdObj = dataObj.getJSONObject(ADVISORY_SITUATION_DATA_STRING);
+
+ JSONObject deposit = new JSONObject();
+ deposit.put("estimatedRemovalDate", request.getSdw().getEstimatedRemovalDate());
+ deposit.put("encodedMsg", asdObj.getString(BYTES));
+ kafkaTemplate.send(this.sdxDepositTopic, deposit.toString());
+ }
- hexEncodedTim = mfObj.getString(BYTES);
+ // no SDW in metadata (SNMP deposit only) -> sign MF -> send to RSU
+ private void processUnsignedMessage(ServiceRequest request,
+ JSONObject metadataJson,
+ JSONObject payloadJson) {
+ log.info("Processing unsigned message.");
+ JSONObject messageFrameJson = payloadJson.getJSONObject(MESSAGE_FRAME);
+ var hexEncodedTimBytes = messageFrameJson.getString(BYTES);
+ String bytesToSend;
+ if ((dataSigningEnabledRSU || dataSigningEnabledSDW) && (request.getSdw() != null || request.getRsus() != null)) {
+ log.debug("Signing encoded TIM message...");
+ String base64EncodedTim = CodecUtils.toBase64(CodecUtils.fromHex(hexEncodedTimBytes));
+
+ // get max duration time and convert from minutes to milliseconds
+ // (unsigned integer valid 0 to 2^32-1 in units of milliseconds) from metadata
+ int maxDurationTime = Integer.parseInt(metadataJson.get("maxDurationTime").toString())
+ * 60 * 1000;
+ var signedResponse = securityServicesClient.signMessage(base64EncodedTim, maxDurationTime);
+ depositToTimCertExpirationTopic(metadataJson, signedResponse, maxDurationTime);
+ bytesToSend = signedResponse.getResult().getHexEncodedMessageSigned();
+ } else {
+ log.debug("Signing not enabled or no SDW or RSU data detected. Sending encoded TIM message without signing...");
+ bytesToSend = hexEncodedTimBytes;
+ }
- // Case 2: SDX-deposit
- if (dataSigningEnabledSDW && request.getSdw() != null) {
- hexEncodedTim = signTIMAndProduceToExpireTopic(hexEncodedTim, consumedObj);
- }
+ log.debug("Encoded message - phase 1: {}", bytesToSend);
+ var encodedTimWithoutHeaders = stripHeader(bytesToSend);
- // Deposit encoded & signed TIM to TMC-filtered topic if TMC-generated
- depositToFilteredTopic(metadataObj, hexEncodedTim);
- if (request.getSdw() != null) {
- // Case 2 only
+ sendToRsus(request, encodedTimWithoutHeaders);
+ depositToFilteredTopic(metadataJson, encodedTimWithoutHeaders);
+ publishForSecondEncoding(request, encodedTimWithoutHeaders);
+ }
- log.debug("Publishing message for round 2 encoding!");
- String xmlizedMessage = asn1CommandManager.packageSignedTimIntoAsd(request, hexEncodedTim);
+ private void depositToTimCertExpirationTopic(JSONObject metadataJson, SignatureResultModel signedResponse, int maxDurationTime) {
+ String packetId = metadataJson.getString("odePacketID");
+ String timStartDateTime = metadataJson.getString("odeTimStartDateTime");
+ JSONObject timWithExpiration = new JSONObject();
+ timWithExpiration.put("packetID", packetId);
+ timWithExpiration.put("startDateTime", timStartDateTime);
- stringMsgProducer.send(asn1CoderTopics.getEncoderInput(), null, xmlizedMessage);
- }
+ var dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+ setExpiryDate(signedResponse, timWithExpiration, dateFormat);
+ setRequiredExpiryDate(dateFormat, timStartDateTime, maxDurationTime, timWithExpiration);
- } else {
- // We have encoded ASD. It could be either UNSECURED or secured.
- if (dataSigningEnabledSDW && request.getSdw() != null) {
- log.debug("Signed message received. Depositing it to SDW.");
- // We have a ASD with signed MessageFrame
- // Case 3
- JSONObject asdObj = dataObj.getJSONObject(
- Asn1CommandManager.ADVISORY_SITUATION_DATA_STRING);
- try {
- JSONObject deposit = new JSONObject();
- deposit.put("estimatedRemovalDate", request.getSdw().getEstimatedRemovalDate());
- deposit.put("encodedMsg", asdObj.getString(BYTES));
- asn1CommandManager.depositToSdw(deposit.toString());
- } catch (JSONException | Asn1CommandManagerException e) {
- String msg = ERROR_ON_SDX_DEPOSIT;
- log.error(msg, e);
- }
- } else {
- log.debug("Unsigned ASD received. Depositing it to SDW.");
- // We have ASD with UNSECURED MessageFrame
- processEncodedTimUnsecured(request, consumedObj);
- }
- }
+ kafkaTemplate.send(jsonTopics.getTimCertExpiration(), timWithExpiration.toString());
}
- /**
- * Process the unsigned encoded TIM message.
- *
- * @param request The service request
- * @param consumedObj The consumed JSON object
- */
- public void processEncodedTimUnsecured(ServiceRequest request, JSONObject consumedObj) {
- // Send TIMs and record results
- HashMap responseList = new HashMap<>();
- JSONObject metadataObj = consumedObj.getJSONObject(AppContext.METADATA_STRING);
-
- JSONObject dataObj = consumedObj
- .getJSONObject(AppContext.PAYLOAD_STRING)
- .getJSONObject(AppContext.DATA_STRING);
+ // SDW in metadata but no ASD in body (send back for another encoding) -> sign MessageFrame
+ // -> send to RSU -> craft ASD object -> publish back to encoder stream
+ private void processEncodedTimUnsigned(ServiceRequest request, JSONObject metadataJson, JSONObject payloadJson) {
+ log.debug("Unsigned ASD received. Depositing it to SDW.");
if (null != request.getSdw()) {
- JSONObject asdObj = null;
- if (dataObj.has(Asn1CommandManager.ADVISORY_SITUATION_DATA_STRING)) {
- asdObj = dataObj.getJSONObject(Asn1CommandManager.ADVISORY_SITUATION_DATA_STRING);
- } else {
- log.error("ASD structure present in metadata but not in JSONObject!");
- }
-
+ var asdObj = payloadJson.getJSONObject(ADVISORY_SITUATION_DATA_STRING);
if (null != asdObj) {
- String asdBytes = asdObj.getString(BYTES);
-
- try {
- JSONObject deposit = new JSONObject();
- deposit.put("estimatedRemovalDate", request.getSdw().getEstimatedRemovalDate());
- deposit.put("encodedMsg", asdBytes);
- asn1CommandManager.depositToSdw(deposit.toString());
- log.info("SDX deposit successful.");
- } catch (Exception e) {
- String msg = ERROR_ON_SDX_DEPOSIT;
- log.error(msg, e);
- EventLogger.logger.error(msg, e);
- }
-
- } else if (log.isErrorEnabled()) {
- // Added to avoid Sonar's "Invoke method(s) only conditionally." code smell
- String msg = "ASN.1 Encoder did not return ASD encoding {}";
- EventLogger.logger.error(msg, consumedObj);
- log.error(msg, consumedObj);
+ depositToSdx(request, asdObj.getString(BYTES));
+ } else {
+ log.error("ASN.1 Encoder did not return ASD encoding {}", payloadJson);
}
}
- if (dataObj.has(MESSAGE_FRAME)) {
- JSONObject mfObj = dataObj.getJSONObject(MESSAGE_FRAME);
+ if (payloadJson.has(MESSAGE_FRAME)) {
+ JSONObject mfObj = payloadJson.getJSONObject(MESSAGE_FRAME);
String encodedTim = mfObj.getString(BYTES);
- // Deposit encoded TIM to TMC-filtered topic if TMC-generated
- depositToFilteredTopic(metadataObj, encodedTim);
-
- // if header is present, strip it
- if (isHeaderPresent(encodedTim)) {
- String header = encodedTim.substring(0, encodedTim.indexOf("001F") + 4);
- log.debug("Stripping header from unsigned message: {}", header);
- encodedTim = stripHeader(encodedTim);
- mfObj.remove(BYTES);
- mfObj.put(BYTES, encodedTim);
- dataObj.remove(MESSAGE_FRAME);
- dataObj.put(MESSAGE_FRAME, mfObj);
- consumedObj.remove(AppContext.PAYLOAD_STRING);
- consumedObj.put(AppContext.PAYLOAD_STRING, dataObj);
- }
+ depositToFilteredTopic(metadataJson, encodedTim);
- log.debug("Encoded message - phase 2: {}", encodedTim);
+ var encodedTimWithoutHeader = stripHeader(encodedTim);
+ log.debug("Encoded message - phase 2: {}", encodedTimWithoutHeader);
- // only send message to rsu if snmp, rsus, and message frame fields are present
- if (null != request.getSnmp() && null != request.getRsus() && null != encodedTim) {
- log.debug("Encoded message phase 3: {}", encodedTim);
- asn1CommandManager.sendToRsus(request, encodedTim);
- }
+ sendToRsus(request, encodedTimWithoutHeader);
}
+ }
- log.info("TIM deposit response {}", responseList);
+ private void depositToSdx(ServiceRequest request, String asdBytes) {
+ try {
+ JSONObject deposit = new JSONObject();
+ deposit.put("estimatedRemovalDate", request.getSdw().getEstimatedRemovalDate());
+ deposit.put("encodedMsg", asdBytes);
+ kafkaTemplate.send(this.sdxDepositTopic, deposit.toString());
+ log.info("SDX deposit successful.");
+ } catch (Exception e) {
+ log.error("Failed to deposit to SDX", e);
+ }
}
/**
- * Sign the encoded TIM message and write to Kafka with an expiration time.
+ * Constructs an XML representation of an Advisory Situation Data (ASD) message containing a
+ * signed Traveler Information Message (TIM). Processes the provided service request and signed
+ * message to create and structure the ASD before converting it to an XML string output.
+ *
+ * @param request the service request object containing meta information, service region,
+ * delivery time, and other necessary data for ASD creation.
+ * @param signedMsg the signed Traveler Information Message (TIM) to be included in the ASD.
*
- * @param encodedTIM The encoded TIM message to be signed
- * @param consumedObj The JSON object to be consumed
- * @return The String representation of the encodedTim payload
+ * @return a String containing the fully crafted ASD message in XML format. Returns null if the
+ * message could not be constructed due to exceptions.
*/
- public String signTIMAndProduceToExpireTopic(String encodedTIM, JSONObject consumedObj) {
- log.debug("Sending message for signature! ");
- String base64EncodedTim = CodecUtils.toBase64(
- CodecUtils.fromHex(encodedTIM));
- JSONObject metadataObjs = consumedObj.getJSONObject(AppContext.METADATA_STRING);
- // get max duration time and convert from minutes to milliseconds (unsigned
- // integer valid 0 to 2^32-1 in units of
- // milliseconds.) from metadata
- int maxDurationTime = Integer.valueOf(metadataObjs.get("maxDurationTime").toString())
- * 60 * 1000;
- String timpacketID = metadataObjs.getString("odePacketID");
- String timStartDateTime = metadataObjs.getString("odeTimStartDateTime");
- String signedResponse = asn1CommandManager.sendForSignature(base64EncodedTim, maxDurationTime);
- try {
- final String hexEncodedTim = CodecUtils.toHex(
- CodecUtils.fromBase64(
- JsonUtils.toJSONObject(JsonUtils.toJSONObject(signedResponse).getString("result"))
- .getString("message-signed")));
-
- JSONObject timWithExpiration = new JSONObject();
- timWithExpiration.put("packetID", timpacketID);
- timWithExpiration.put("startDateTime", timStartDateTime);
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
- try {
- JSONObject jsonResult = JsonUtils
- .toJSONObject((JsonUtils.toJSONObject(signedResponse).getString("result")));
- // messageExpiry uses unit of seconds
- long messageExpiry = Long.parseLong(jsonResult.getString("message-expiry"));
- timWithExpiration.put("expirationDate", dateFormat.format(new Date(messageExpiry * 1000)));
- } catch (Exception e) {
- log.error("Unable to get expiration date from signed messages response ", e);
- timWithExpiration.put("expirationDate", "null");
- }
+ private String packageSignedTimIntoAsd(ServiceRequest request, String signedMsg) throws JsonProcessingException, ParseException {
+ SDW sdw = request.getSdw();
+ SNMP snmp = request.getSnmp();
+ DdsAdvisorySituationData asd;
+
+ byte sendToRsu =
+ request.getRsus() != null ? DdsAdvisorySituationData.RSU : DdsAdvisorySituationData.NONE;
+ byte distroType = (byte) (DdsAdvisorySituationData.IP | sendToRsu);
+
+ asd = new DdsAdvisorySituationData()
+ .setServiceRegion(GeoRegionBuilder.ddsGeoRegion(sdw.getServiceRegion()))
+ .setTimeToLive(sdw.getTtl())
+ .setGroupID(sdw.getGroupID()).setRecordID(sdw.getRecordId());
+
+ if (null != snmp) {
+ asd.setAsdmDetails(snmp.getDeliverystart(), snmp.getDeliverystop(), distroType, null);
+ } else {
+ asd.setAsdmDetails(sdw.getDeliverystart(), sdw.getDeliverystop(), distroType, null);
+ }
- try {
- Date parsedtimTimeStamp = dateFormat.parse(timStartDateTime);
- Date requiredExpirationDate = new Date();
- requiredExpirationDate.setTime(parsedtimTimeStamp.getTime() + maxDurationTime);
- timWithExpiration.put("requiredExpirationDate", dateFormat.format(requiredExpirationDate));
- } catch (Exception e) {
- log.error("Unable to parse requiredExpirationDate ", e);
- timWithExpiration.put("requiredExpirationDate", "null");
- }
- // publish to Tim expiration kafka
- stringMsgProducer.send(jsonTopics.getTimCertExpiration(), null,
- timWithExpiration.toString());
- return hexEncodedTim;
+ var asdJson = (ObjectNode) mapper.readTree(asd.toJson());
+
+ var admDetailsObj = (ObjectNode) asdJson.findValue("asdmDetails");
+ admDetailsObj.remove("advisoryMessage");
+ admDetailsObj.put("advisoryMessage", signedMsg);
+
+ asdJson.set("asdmDetails", admDetailsObj);
+
+ ObjectNode advisorySituationDataNode = mapper.createObjectNode();
+ advisorySituationDataNode.set(ADVISORY_SITUATION_DATA_STRING, asdJson);
+
+ OdeMsgPayload payload = new OdeAsdPayload(asd);
+
+ var payloadNode = (ObjectNode) mapper.readTree(payload.toJson());
+ payloadNode.set(AppContext.DATA_STRING, advisorySituationDataNode);
+
+ OdeMsgMetadata metadata = new OdeMsgMetadata(payload);
+ var metadataNode = (ObjectNode) mapper.readTree(metadata.toJson());
+
+ metadataNode.set("request", mapper.readTree(request.toJson()));
+
+ ArrayNode encodings = buildEncodings();
+ var embeddedEncodings = xmlMapper.createObjectNode();
+ embeddedEncodings.set(AppContext.ENCODINGS_STRING, encodings);
+
+ metadataNode.set(AppContext.ENCODINGS_STRING, embeddedEncodings);
+
+ ObjectNode message = mapper.createObjectNode();
+ message.set(AppContext.METADATA_STRING, metadataNode);
+ message.set(AppContext.PAYLOAD_STRING, payloadNode);
+
+ ObjectNode root = mapper.createObjectNode();
+ root.set(AppContext.ODE_ASN1_DATA, message);
+
+ var outputXml = xmlMapper.writeValueAsString(root)
+ .replace("", "")
+ .replace("", "");
+ log.debug("Fully crafted ASD to be encoded: {}", outputXml);
+ return outputXml;
+ }
+
+ private ArrayNode buildEncodings() throws JsonProcessingException {
+ ArrayNode encodings = mapper.createArrayNode();
+ var encoding = new Asn1Encoding(ADVISORY_SITUATION_DATA_STRING, ADVISORY_SITUATION_DATA_STRING, EncodingRule.UPER);
+ encodings.add(mapper.readTree(mapper.writeValueAsString(encoding)));
+ return encodings;
+ }
- } catch (JsonUtilsException e1) {
- log.error("Unable to parse signed message response ", e1);
+ private void sendToRsus(ServiceRequest request, String encodedMsg) {
+ if (null == request.getSnmp() || null == request.getRsus()) {
+ log.debug("No RSUs or SNMP provided. Not sending to RSUs.");
+ return;
}
- return encodedTIM;
+ log.info("Sending message to RSUs...");
+ rsuDepositor.deposit(request, encodedMsg);
}
- /**
- * Checks if header is present in encoded message.
- */
- private boolean isHeaderPresent(String encodedTim) {
- return encodedTim.indexOf("001F") > 0;
+ private void setRequiredExpiryDate(DateTimeFormatter dateFormat, String timStartDateTime,
+ int maxDurationTime, JSONObject timWithExpiration) {
+ try {
+ var timStartLocalDate = LocalDateTime.ofInstant(Instant.parse(timStartDateTime), ZoneId.of("UTC"));
+ var expiryDate = timStartLocalDate.plus(maxDurationTime, ChronoUnit.MILLIS);
+ timWithExpiration.put("requiredExpirationDate", expiryDate.format(dateFormat));
+ } catch (Exception e) {
+ log.error("Unable to parse requiredExpirationDate. Setting requiredExpirationDate to 'null'", e);
+ timWithExpiration.put("requiredExpirationDate", "null");
+ }
+ }
+
+ private void setExpiryDate(SignatureResultModel signedResponse,
+ JSONObject timWithExpiration,
+ DateTimeFormatter dateFormat) {
+ try {
+ var messageExpiryMillis = signedResponse.getResult().getMessageExpiry() * 1000;
+ var expiryDate = LocalDateTime.ofInstant(Instant.ofEpochMilli(messageExpiryMillis), ZoneId.of("UTC"));
+ timWithExpiration.put("expirationDate", expiryDate.format(dateFormat));
+ } catch (Exception e) {
+ log.error("Unable to get expiration date from signed messages response. Setting expirationData to 'null'", e);
+ timWithExpiration.put("expirationDate", "null");
+ }
}
/**
* Strips header from unsigned message (all bytes before 001F hex value).
*/
private String stripHeader(String encodedUnsignedTim) {
- String toReturn = "";
// find 001F hex value
- int index = encodedUnsignedTim.indexOf("001F");
+ int index = encodedUnsignedTim.indexOf(SupportedMessageType.TIM.getStartFlag().toUpperCase());
if (index == -1) {
- log.warn("No '001F' hex value found in encoded message");
+ log.warn("No {} hex value found in encoded message", SupportedMessageType.TIM.getStartFlag());
return encodedUnsignedTim;
}
// strip everything before 001F
- toReturn = encodedUnsignedTim.substring(index);
- return toReturn;
+ return encodedUnsignedTim.substring(index);
}
private void depositToFilteredTopic(JSONObject metadataObj, String hexEncodedTim) {
- try {
- String generatedBy = metadataObj.getString("recordGeneratedBy");
- String streamId = metadataObj.getJSONObject("serialId").getString("streamId");
- if (!generatedBy.equalsIgnoreCase("TMC")) {
- log.debug("Not a TMC-generated TIM. Skipping deposit to TMC-filtered topic.");
- return;
- }
+ String generatedBy = metadataObj.getString("recordGeneratedBy");
+ String streamId = metadataObj.getJSONObject("serialId").getString("streamId");
+ if (!generatedBy.equalsIgnoreCase("TMC")) {
+ log.debug("Not a TMC-generated TIM. Skipping deposit to TMC-filtered topic.");
+ return;
+ }
- String timString = odeTimJsonTopology.query(streamId);
- if (timString != null) {
- // Set ASN1 data in TIM metadata
- JSONObject timJSON = new JSONObject(timString);
- JSONObject metadataJSON = timJSON.getJSONObject("metadata");
- metadataJSON.put("asn1", hexEncodedTim);
- timJSON.put("metadata", metadataJSON);
+ String timString = odeTimJsonTopology.query(streamId);
+ if (timString != null) {
+ // Set ASN1 data in TIM metadata
+ JSONObject timJSON = new JSONObject(timString);
+ JSONObject metadataJSON = timJSON.getJSONObject("metadata");
+ metadataJSON.put("asn1", hexEncodedTim);
+ timJSON.put("metadata", metadataJSON);
- // Send the message w/ asn1 data to the TMC-filtered topic
- stringMsgProducer.send(jsonTopics.getTimTmcFiltered(), null, timJSON.toString());
- }
- } catch (JSONException e) {
- log.error("Error while fetching recordGeneratedBy field: {}", e.getMessage());
+ // Send the message w/ asn1 data to the TMC-filtered topic
+ kafkaTemplate.send(jsonTopics.getTimTmcFiltered(), timJSON.toString());
+ } else {
+ log.debug("TIM not found in k-table. Skipping deposit to TMC-filtered topic.");
+ }
+ }
+
+ private void publishForSecondEncoding(ServiceRequest request, String encodedTimWithoutHeaders) {
+ if (request.getSdw() == null) {
+ log.debug("SDW not present. No second encoding required.");
+ return;
+ }
+
+ try {
+ log.debug("Publishing message for round 2 encoding");
+ String asdPackagedTim = packageSignedTimIntoAsd(request, encodedTimWithoutHeaders);
+ kafkaTemplate.send(asn1CoderTopics.getEncoderInput(), asdPackagedTim);
} catch (Exception e) {
- log.error("Error while updating TIM: {}", e.getMessage());
+ log.error("Error packaging ASD for round 2 encoding", e);
}
}
}
diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/AsnCodecRouterServiceController.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/AsnCodecRouterServiceController.java
deleted file mode 100644
index 7734b2115..000000000
--- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/services/asn1/AsnCodecRouterServiceController.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*******************************************************************************
- * Copyright 2018 572682
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy
- * of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-package us.dot.its.jpo.ode.services.asn1;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Controller;
-import us.dot.its.jpo.ode.kafka.OdeKafkaProperties;
-import us.dot.its.jpo.ode.kafka.topics.Asn1CoderTopics;
-import us.dot.its.jpo.ode.kafka.topics.JsonTopics;
-import us.dot.its.jpo.ode.kafka.topics.SDXDepositorTopics;
-import us.dot.its.jpo.ode.rsu.RsuProperties;
-import us.dot.its.jpo.ode.security.SecurityServicesProperties;
-import us.dot.its.jpo.ode.wrapper.MessageConsumer;
-
-/**
- * Launches ToJsonConverter service
- */
-@Controller
-@Slf4j
-public class AsnCodecRouterServiceController {
-
- @Autowired
- public AsnCodecRouterServiceController(OdeKafkaProperties odeKafkaProperties,
- JsonTopics jsonTopics,
- Asn1CoderTopics asn1CoderTopics,
- SDXDepositorTopics sdxDepositorTopics,
- RsuProperties rsuProperties,
- SecurityServicesProperties securityServicesProperties) {
- super();
-
- log.info("Starting {}", this.getClass().getSimpleName());
-
- // asn1_codec Encoder Routing
- log.info("Routing ENCODED data received ASN.1 Encoder");
-
- Asn1EncodedDataRouter encoderRouter = new Asn1EncodedDataRouter(
- odeKafkaProperties,
- asn1CoderTopics,
- jsonTopics,
- sdxDepositorTopics,
- rsuProperties,
- securityServicesProperties);
-
- MessageConsumer encoderConsumer = MessageConsumer.defaultStringMessageConsumer(
- odeKafkaProperties.getBrokers(), this.getClass().getSimpleName(), encoderRouter);
-
- encoderConsumer.setName("Asn1EncoderConsumer");
- encoderRouter.start(encoderConsumer, asn1CoderTopics.getEncoderOutput());
- }
-}
diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/snmp/SnmpSession.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/snmp/SnmpSession.java
index 9fdeeede0..86a55c41c 100644
--- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/snmp/SnmpSession.java
+++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/snmp/SnmpSession.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*============================================================================
* Copyright 2018 572682
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
@@ -13,8 +13,11 @@
* License for the specific language governing permissions and limitations under
* the License.
******************************************************************************/
+
package us.dot.its.jpo.ode.snmp;
+import java.io.IOException;
+import java.text.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.snmp4j.PDU;
@@ -38,386 +41,397 @@
import org.snmp4j.smi.OctetString;
import org.snmp4j.smi.VariableBinding;
import org.snmp4j.transport.DefaultUdpTransportMapping;
-import us.dot.its.jpo.ode.eventlog.EventLogger;
import us.dot.its.jpo.ode.plugin.RoadSideUnit.RSU;
import us.dot.its.jpo.ode.plugin.SNMP;
import us.dot.its.jpo.ode.plugin.ServiceRequest;
import us.dot.its.jpo.ode.plugin.ServiceRequest.OdeInternal.RequestVerb;
import us.dot.its.jpo.ode.plugin.SnmpProtocol;
-import java.io.IOException;
-import java.text.ParseException;
-
/**
- * This object is used to abstract away the complexities of SNMP calls and allow
- * a user to more quickly and easily send SNMP requests. Note that the
- * "connection" aspect of this class is an abstraction meant to reinforce that
- * these objects correspond 1-to-1 with a destination server, while SNMP is sent
- * over UDP and is actually connection-less.
+ * This object is used to abstract away the complexities of SNMP calls and allow a user to more
+ * quickly and easily send SNMP requests. Note that the "connection" aspect of this class is an
+ * abstraction meant to reinforce that these objects correspond 1-to-1 with a destination server,
+ * while SNMP is sent over UDP and is actually connection-less.
*/
public class SnmpSession {
- private static final Logger logger = LoggerFactory.getLogger(SnmpSession.class);
-
- private Snmp snmp;
- private TransportMapping transport;
- private UserTarget target;
-
- private boolean ready = false;
- private boolean listening;
-
- /**
- * Constructor for SnmpSession
- *
- * @param props SnmpProperties for the session (target address, retries,
- * timeout, etc)
- * @throws IOException
- */
- public SnmpSession(RSU rsu) throws IOException {
- Address addr = GenericAddress.parse(rsu.getRsuTarget() + "/161");
-
- // Create a "target" to which a request is sent
- target = new UserTarget();
- target.setAddress(addr);
- target.setRetries(rsu.getRsuRetries());
- target.setTimeout(rsu.getRsuTimeout());
- target.setVersion(SnmpConstants.version3);
- if (rsu.getRsuUsername() != null) {
- target.setSecurityLevel(SecurityLevel.AUTH_PRIV);
- target.setSecurityName(new OctetString(rsu.getRsuUsername()));
- } else {
- target.setSecurityLevel(SecurityLevel.NOAUTH_NOPRIV);
- }
-
- // Set up the UDP transport mapping over which requests are sent
- transport = null;
- try {
- transport = new DefaultUdpTransportMapping();
- } catch (IOException e) {
- throw new IOException("Failed to create UDP transport mapping: {}", e);
- }
-
- // Instantiate the SNMP instance
- snmp = new Snmp(transport);
-
- // Register the security options and create an SNMP "user"
- USM usm = new USM(SecurityProtocols.getInstance(), new OctetString(MPv3.createLocalEngineID()), 0);
- SecurityModels.getInstance().addSecurityModel(usm);
- if (rsu.getRsuUsername() != null) {
- snmp.getUSM().addUser(new OctetString(rsu.getRsuUsername()), new UsmUser(new OctetString(rsu.getRsuUsername()),
- AuthSHA.ID, new OctetString(rsu.getRsuPassword()), PrivAES128.ID, new OctetString(rsu.getRsuPassword())));
- }
-
- // Assert the ready flag so the user can begin sending messages
- ready = true;
- }
-
- /**
- * Sends a SET-type PDU to the target specified by the constructor.
- *
- * @param pdu The message content to be sent to the target
- * @return ResponseEvent
- * @throws IOException
- */
- public ResponseEvent set(PDU pdu, Snmp snmpob, UserTarget targetob, Boolean keepOpen) throws IOException {
-
- // Ensure the object has been instantiated
- if (!ready) {
- throw new IOException("Tried to send PDU before SNMP sending service is ready.");
- }
-
- if (!listening) {
- startListen();
- }
-
- // Try to send the SNMP request (synchronously)
- ResponseEvent responseEvent;
- try {
- // attempt to discover & set authoritative engine ID
- byte[] authEngineID = snmpob.discoverAuthoritativeEngineID(targetob.getAddress(), 1000);
- if (authEngineID != null && authEngineID.length > 0) {
- targetob.setAuthoritativeEngineID(authEngineID);
- }
-
- // send request
- responseEvent = snmpob.set(pdu, targetob);
- if (!keepOpen) {
- snmpob.close();
- }
-
- // if RSU responded and we didn't get an authEngineID, log a warning
- if (responseEvent != null && responseEvent.getResponse() != null && authEngineID == null) {
- logger.warn("Failed to discover authoritative engine ID for SNMP target: {}", targetob.getAddress());
- }
- } catch (IOException e) {
- throw new IOException("Failed to send SNMP request: " + e);
- }
-
- return responseEvent;
+ private static final Logger logger = LoggerFactory.getLogger(SnmpSession.class);
+
+ private Snmp snmp;
+ private TransportMapping transport;
+ private UserTarget target;
+
+ private boolean ready = false;
+ private boolean listening;
+
+ /**
+ * Constructor for SnmpSession.
+ *
+ * @throws IOException when failing to create the snmp Transport
+ */
+ public SnmpSession(RSU rsu) throws IOException {
+ Address addr = GenericAddress.parse(rsu.getRsuTarget() + "/161");
+
+ // Create a "target" to which a request is sent
+ target = new UserTarget();
+ target.setAddress(addr);
+ target.setRetries(rsu.getRsuRetries());
+ target.setTimeout(rsu.getRsuTimeout());
+ target.setVersion(SnmpConstants.version3);
+ if (rsu.getRsuUsername() != null) {
+ target.setSecurityLevel(SecurityLevel.AUTH_PRIV);
+ target.setSecurityName(new OctetString(rsu.getRsuUsername()));
+ } else {
+ target.setSecurityLevel(SecurityLevel.NOAUTH_NOPRIV);
}
- /**
- * Sends a SET-type PDU to the target specified by the constructor.
- *
- * @param pdu The message content to be sent to the target
- * @return ResponseEvent
- * @throws IOException
- */
- public ResponseEvent get(PDU pdu, Snmp snmpob, UserTarget targetob, Boolean keepOpen) throws IOException {
-
- // Ensure the object has been instantiated
- if (!ready) {
- throw new IOException("Tried to send PDU before SNMP sending service is ready.");
- }
-
- // Start listening on UDP
- if (!listening) {
- startListen();
- }
-
- // Try to send the SNMP request (synchronously)
- ResponseEvent responseEvent = null;
- try {
- responseEvent = snmpob.get(pdu, targetob);
- if (!keepOpen) {
- snmpob.close();
- }
- } catch (IOException e) {
- throw new IOException("Failed to send SNMP request: " + e);
- }
-
- return responseEvent;
+ // Set up the UDP transport mapping over which requests are sent
+ transport = new DefaultUdpTransportMapping();
+
+ // Instantiate the SNMP instance
+ snmp = new Snmp(transport);
+
+ // Register the security options and create an SNMP "user"
+ USM usm =
+ new USM(SecurityProtocols.getInstance(), new OctetString(MPv3.createLocalEngineID()), 0);
+ SecurityModels.getInstance().addSecurityModel(usm);
+ if (rsu.getRsuUsername() != null) {
+ snmp.getUSM().addUser(new OctetString(rsu.getRsuUsername()),
+ new UsmUser(new OctetString(rsu.getRsuUsername()),
+ AuthSHA.ID, new OctetString(rsu.getRsuPassword()), PrivAES128.ID,
+ new OctetString(rsu.getRsuPassword())));
}
- /**
- * Start listening for responses
- *
- * @throws IOException
- */
- public void startListen() throws IOException {
- transport.listen();
- listening = true;
+ // Assert the ready flag so the user can begin sending messages
+ ready = true;
+ }
+
+ /**
+ * Sends a SET-type PDU to the target specified by the constructor.
+ *
+ * @param pdu The message content to be sent to the target
+ * @return ResponseEvent
+ * @throws IOException when calling this method before the session is ready
+ */
+ public ResponseEvent set(PDU pdu, Snmp snmpob, UserTarget targetob, Boolean keepOpen)
+ throws IOException {
+
+ // Ensure the object has been instantiated
+ if (!ready) {
+ throw new IOException("Tried to send PDU before SNMP sending service is ready.");
}
- /**
- * Create an SNMP session given the values in
- *
- * @param tim - The TIM parameters (payload, channel, mode, etc)
- * @param props - The SNMP properties (ip, username, password, etc)
- * @return ResponseEvent
- * @throws TimPduCreatorException
- * @throws IOException
- * @throws ParseException
- */
- public static ResponseEvent createAndSend(SNMP snmp, RSU rsu, String payload, RequestVerb requestVerb, boolean dataSigningEnabledRSU)
- throws ParseException, IOException {
-
- SnmpSession session = new SnmpSession(rsu);
-
- // Send the PDU
- ResponseEvent response = null;
- ScopedPDU pdu = SnmpSession.createPDU(snmp, payload, rsu.getRsuIndex(), requestVerb, rsu.getSnmpProtocol(), dataSigningEnabledRSU);
- response = session.set(pdu, session.getSnmp(), session.getTarget(), false);
- String msg = "Message Sent to {}, index {}: {}";
- EventLogger.logger.debug(msg, rsu.getRsuTarget(), rsu.getRsuIndex(), payload);
- logger.debug(msg, rsu.getRsuTarget(), rsu.getRsuIndex(), payload);
- return response;
+ if (!listening) {
+ startListen();
}
- public Snmp getSnmp() {
- return snmp;
+ // Try to send the SNMP request (synchronously)
+ ResponseEvent responseEvent;
+ try {
+ // attempt to discover & set authoritative engine ID
+ byte[] authEngineID = snmpob.discoverAuthoritativeEngineID(targetob.getAddress(), 1000);
+ if (authEngineID != null && authEngineID.length > 0) {
+ targetob.setAuthoritativeEngineID(authEngineID);
+ }
+
+ // send request
+ responseEvent = snmpob.set(pdu, targetob);
+ if (!keepOpen) {
+ snmpob.close();
+ }
+
+ // if RSU responded and we didn't get an authEngineID, log a warning
+ if (responseEvent != null && responseEvent.getResponse() != null && authEngineID == null) {
+ logger.warn("Failed to discover authoritative engine ID for SNMP target: {}",
+ targetob.getAddress());
+ }
+ } catch (IOException e) {
+ throw new IOException("Failed to send SNMP request: " + e);
}
- public void setSnmp(Snmp snmp) {
- this.snmp = snmp;
+ return responseEvent;
+ }
+
+ /**
+ * Sends a SET-type PDU to the target specified by the constructor.
+ *
+ * @param pdu The message content to be sent to the target
+ * @return ResponseEvent
+ * @throws IOException when calling this method before the session is ready
+ */
+ public ResponseEvent get(PDU pdu, Snmp snmpob, UserTarget targetob, Boolean keepOpen)
+ throws IOException {
+
+ // Ensure the object has been instantiated
+ if (!ready) {
+ throw new IOException("Tried to send PDU before SNMP sending service is ready.");
}
- public TransportMapping getTransport() {
- return transport;
+ // Start listening on UDP
+ if (!listening) {
+ startListen();
}
- public void setTransport(TransportMapping transport) {
- this.transport = transport;
+ // Try to send the SNMP request (synchronously)
+ ResponseEvent responseEvent = null;
+ try {
+ responseEvent = snmpob.get(pdu, targetob);
+ if (!keepOpen) {
+ snmpob.close();
+ }
+ } catch (IOException e) {
+ throw new IOException("Failed to send SNMP request: " + e);
}
- public UserTarget getTarget() {
- return target;
+ return responseEvent;
+ }
+
+ /**
+ * Start listening for responses.
+ *
+ * @throws IOException when listening failed
+ */
+ public void startListen() throws IOException {
+ transport.listen();
+ listening = true;
+ }
+
+
+ /**
+ * Creates and sends a PDU to a specific RSU (Road Side Unit) using the provided SNMP session. The
+ * request is built with specified payload, request verb, and data signing option.
+ *
+ * @param snmp The SNMP object containing configuration and metadata for
+ * constructing the request.
+ * @param rsu The RSU object that provides target and protocol information for
+ * the request destination.
+ * @param payload The payload string to be included in the PDU of the request.
+ * @param requestVerb The type of SNMP request verb (e.g., GET, SET) for the operation.
+ * @param dataSigningEnabledRSU A boolean flag indicating if data signing is enabled for the RSU.
+ * @return A ResponseEvent representing the response received from the RSU.
+ * @throws ParseException If there is an error in parsing the payload or PDU creation.
+ * @throws IOException If there is an error in establishing or using the SNMP transport.
+ */
+ public static ResponseEvent createAndSend(SNMP snmp, RSU rsu, String payload,
+ RequestVerb requestVerb, boolean dataSigningEnabledRSU) throws ParseException, IOException {
+
+ SnmpSession session = new SnmpSession(rsu);
+
+ // Send the PDU
+ ScopedPDU pdu = SnmpSession.createPDU(snmp, payload, rsu.getRsuIndex(), requestVerb,
+ rsu.getSnmpProtocol(), dataSigningEnabledRSU);
+ ResponseEvent response =
+ session.set(pdu, session.getSnmp(), session.getTarget(), false);
+ logger.debug("Message Sent to {}, index {}: {}", rsu.getRsuTarget(), rsu.getRsuIndex(),
+ payload);
+ return response;
+ }
+
+ public Snmp getSnmp() {
+ return snmp;
+ }
+
+ public void setSnmp(Snmp snmp) {
+ this.snmp = snmp;
+ }
+
+ public TransportMapping getTransport() {
+ return transport;
+ }
+
+ public void setTransport(TransportMapping transport) {
+ this.transport = transport;
+ }
+
+ public UserTarget getTarget() {
+ return target;
+ }
+
+ public void setTarget(UserTarget target) {
+ this.target = target;
+ }
+
+ public void endSession() throws IOException {
+ this.snmp.close();
+ }
+
+ /**
+ * Creates a ScopedPDU object configured based on the specified SNMP protocol.
+ *
+ * @param snmp The SNMP object containing configuration and metadata for
+ * constructing the PDU.
+ * @param payload The payload string to be included in the PDU.
+ * @param index The index value associated with the PDU construction.
+ * @param verb The request verb (e.g., GET, SET) for the PDU.
+ * @param snmpProtocol The SNMP protocol version or type used for constructing the PDU.
+ * @param dataSigningEnabledRSU A boolean flag indicating whether data signing is enabled for the
+ * RSU.
+ * @return A ScopedPDU object constructed based on the provided parameters, or null if the
+ * protocol is unknown.
+ * @throws ParseException If there is an error in parsing the payload or during PDU creation.
+ */
+ public static ScopedPDU createPDU(SNMP snmp, String payload, int index, RequestVerb verb,
+ SnmpProtocol snmpProtocol, boolean dataSigningEnabledRSU) throws ParseException {
+ switch (snmpProtocol) {
+ case FOURDOT1:
+ return createPDUWithFourDot1Protocol(snmp, payload, index, verb);
+ case NTCIP1218:
+ return createPDUWithNTCIP1218Protocol(snmp, payload, index, verb, dataSigningEnabledRSU);
+ default:
+ logger.error("Unknown SNMP protocol: {}", snmpProtocol);
+ return null;
}
-
- public void setTarget(UserTarget target) {
- this.target = target;
+ }
+
+ /**
+ * Encodes the given value into an SNMP variable binding using specific encoding rules
+ * based on the value's range.
+ *
+ * @param oid The Object Identifier (OID) as a string, representing the SNMP object ID.
+ * @param val The value to be encoded, provided as a hexadecimal string.
+ * @return A VariableBinding object that contains the OID and the encoded value, or null
+ * if the value does not fall within any of the predefined ranges.
+ */
+ public static VariableBinding getPEncodedVariableBinding(String oid, String val) {
+ Integer intVal = Integer.parseInt(val, 16);
+ Integer additionValue = null;
+
+ if (intVal >= 0 && intVal <= 127) {
+ // P = V
+ // here we must instantiate the OctetString directly with the hex string to
+ // avoid inadvertently creating the ASCII character codes
+ // for instance OctetString.fromString("20", 16) produces the space character ("
+ // ") rather than hex 20
+ return new VariableBinding(new OID(oid), new OctetString(Integer.toHexString(intVal)));
+ } else if (intVal >= 128 && intVal <= 16511) {
+ // P = V + 0x7F80
+ additionValue = 0x7F80;
+ } else if (intVal >= 016512 && intVal <= 2113663) {
+ // P = V + 0xBFBF80
+ additionValue = 0xBFBF80;
+ } else if (intVal >= 2113664 && intVal <= 270549119) {
+ // P = V + 0xDFDFBF80
+ additionValue = 0xDFDFBF80;
}
- public void endSession() throws IOException {
- this.snmp.close();
+ if (additionValue != null) {
+ return new VariableBinding(new OID(oid),
+ OctetString.fromString(Integer.toHexString(intVal + additionValue), 16));
}
-
- /**
- * Assembles the various RSU elements of a TimParameters object into a usable
- * PDU.
- *
- * @param index Storage index on the RSU
- * @param params TimParameters POJO that stores status, channel, payload, etc.
- * @return PDU
- * @throws ParseException
- */
- public static ScopedPDU createPDU(SNMP snmp, String payload, int index, RequestVerb verb, SnmpProtocol snmpProtocol, boolean dataSigningEnabledRSU) throws ParseException {
- switch (snmpProtocol) {
- case FOURDOT1:
- return createPDUWithFourDot1Protocol(snmp, payload, index, verb);
- case NTCIP1218:
- return createPDUWithNTCIP1218Protocol(snmp, payload, index, verb, dataSigningEnabledRSU);
- default:
- logger.error("Unknown SNMP protocol: {}", snmpProtocol);
- return null;
- }
+ return null;
+ }
+
+ private static ScopedPDU createPDUWithFourDot1Protocol(SNMP snmp, String payload, int index,
+ RequestVerb verb) throws ParseException {
+ //////////////////////////////
+ // - OID examples - //
+ //////////////////////////////
+ // rsuSRMStatus.3 = 4
+ // --> 1.4.1.11.3 = 4
+ // rsuSRMTxChannel.3 = 3
+ // --> 1.4.1.5.3 = 178
+ // rsuSRMTxMode.3 = 1
+ // --> 1.4.1.4.3 = 1
+ // rsuSRMPsid.3 x "8003"
+ // --> 1.4.1.2.3 x "8003"
+ // rsuSRMDsrcMsgId.3 = 31
+ // --> 1.4.1.3.3 = 31
+ // rsuSRMTxInterval.3 = 10
+ // --> 1.4.1.6.3 = 10
+ // rsuSRMDeliveryStart.3 x "07e7051f0c000000"
+ // --> 1.4.1.7.3 = "07e7051f0c000000"
+ // rsuSRMDeliveryStop.3 x "07e7060f0c000000"
+ // --> 1.4.1.8.3 = "07e7060f0c000000"
+ // rsuSRMPayload.3 x "001f6020100000000000de8f834082729de80d80734d37862d2187864fc2099f1f4028407e53bd01b00e69a6f0c5a409c46c3c300118e69a26fa77a0104b8e69a2e86779e21981414e39a68fd29de697d804fb38e69a50e27796151013d81080020290"
+ // --> 1.4.1.9.3 = "001f6020100000000000de8f834082729de80d80734d37862d2187864fc2099f1f4028407e53bd01b00e69a6f0c5a409c46c3c300118e69a26fa77a0104b8e69a2e86779e21981414e39a68fd29de697d804fb38e69a50e27796151013d81080020290"
+ // rsuSRMEnable.3 = 1
+ // --> 1.4.1.10.3 = 1
+ //////////////////////////////
+
+ VariableBinding rsuSRMPsid = SnmpFourDot1Protocol.getVbRsuSrmPsid(index, snmp.getRsuid());
+ VariableBinding rsuSRMDsrcMsgId =
+ SnmpFourDot1Protocol.getVbRsuSrmDsrcMsgId(index, snmp.getMsgid());
+ VariableBinding rsuSRMTxMode = SnmpFourDot1Protocol.getVbRsuSrmTxMode(index, snmp.getMode());
+ VariableBinding rsuSRMTxChannel =
+ SnmpFourDot1Protocol.getVbRsuSrmTxChannel(index, snmp.getChannel());
+ VariableBinding rsuSRMTxInterval =
+ SnmpFourDot1Protocol.getVbRsuSrmTxInterval(index, snmp.getInterval());
+ VariableBinding rsuSRMDeliveryStart =
+ SnmpFourDot1Protocol.getVbRsuSrmDeliveryStart(index, snmp.getDeliverystart());
+ VariableBinding rsuSRMDeliveryStop =
+ SnmpFourDot1Protocol.getVbRsuSrmDeliveryStop(index, snmp.getDeliverystop());
+ VariableBinding rsuSRMPayload = SnmpFourDot1Protocol.getVbRsuSrmPayload(index, payload);
+ VariableBinding rsuSRMEnable = SnmpFourDot1Protocol.getVbRsuSrmEnable(index, snmp.getEnable());
+
+ ScopedPDU pdu = new ScopedPDU();
+ pdu.add(rsuSRMPsid);
+ pdu.add(rsuSRMDsrcMsgId);
+ pdu.add(rsuSRMTxMode);
+ pdu.add(rsuSRMTxChannel);
+ pdu.add(rsuSRMTxInterval);
+ pdu.add(rsuSRMDeliveryStart);
+ pdu.add(rsuSRMDeliveryStop);
+ pdu.add(rsuSRMPayload);
+ pdu.add(rsuSRMEnable);
+ if (verb == ServiceRequest.OdeInternal.RequestVerb.POST) {
+ VariableBinding rsuSRMStatus = SnmpFourDot1Protocol.getVbRsuSrmStatus(index, snmp.getStatus());
+ pdu.add(rsuSRMStatus);
}
-
- public static VariableBinding getPEncodedVariableBinding(String oid, String val) {
- Integer intVal = Integer.parseInt(val, 16);
- Integer additionValue = null;
-
- if (intVal >= 0 && intVal <= 127) {
- // P = V
- // here we must instantiate the OctetString directly with the hex string to
- // avoid inadvertently creating the ASCII character codes
- // for instance OctetString.fromString("20", 16) produces the space character ("
- // ") rather than hex 20
- return new VariableBinding(new OID(oid), new OctetString(Integer.toHexString(intVal)));
- } else if (intVal >= 128 && intVal <= 16511) {
- // P = V + 0x7F80
- additionValue = 0x7F80;
- } else if (intVal >= 016512 && intVal <= 2113663) {
- // P = V + 0xBFBF80
- additionValue = 0xBFBF80;
- } else if (intVal >= 2113664 && intVal <= 270549119) {
- // P = V + 0xDFDFBF80
- additionValue = 0xDFDFBF80;
- }
-
- if (additionValue != null) {
- return new VariableBinding(new OID(oid),
- OctetString.fromString(Integer.toHexString(intVal + additionValue), 16));
- }
- return null;
+ pdu.setType(PDU.SET);
+
+ return pdu;
+ }
+
+ private static ScopedPDU createPDUWithNTCIP1218Protocol(SNMP snmp, String payload, int index,
+ RequestVerb verb, boolean dataSigningEnabledRSU) throws ParseException {
+ //////////////////////////////
+ // - OID examples - //
+ //////////////////////////////
+ // rsuMsgRepeatPsid.3 x "8003"
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.2.3 x "8003"
+ // rsuMsgRepeatTxChannel.3 = 3
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.3.3 = 183
+ // rsuMsgRepeatTxInterval.3 = 10
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.4.3 = 10
+ // rsuMsgRepeatDeliveryStart.3 x "07e7051f0c000000"
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.5.3 = "07e7051f0c000000"
+ // rsuMsgRepeatDeliveryStop.3 x "07e7060f0c000000"
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.6.3 = "07e7060f0c000000"
+ // rsuMsgRepeatPayload.3 x "001f6020100000000000de8f834082729de80d80734d37862d2187864fc2099f1f4028407e53bd01b00e69a6f0c5a409c46c3c300118e69a26fa77a0104b8e69a2e86779e21981414e39a68fd29de697d804fb38e69a50e27796151013d81080020290"
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.7.3 = "001f6020100000000000de8f834082729de80d80734d37862d2187864fc2099f1f4028407e53bd01b00e69a6f0c5a409c46c3c300118e69a26fa77a0104b8e69a2e86779e21981414e39a68fd29de697d804fb38e69a50e27796151013d81080020290"
+ // rsuMsgRepeatEnable.3 = 1
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.8.3 = 1
+ // rsuMsgRepeatStatus.3 = 4
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.9.3 = 4
+ // rsuMsgRepeatPriority.3 = 6
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.10.3 = 6
+ // rsuMsgRepeatOptions.3 = "00"
+ // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.11.3 = "00"
+ //////////////////////////////
+
+ // note: dsrc_msg_id is not in NTCIP 1218
+ // note: tx_mode is not in NTCIP 1218
+ ScopedPDU pdu = new ScopedPDU();
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatPsid(index, snmp.getRsuid()));
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatTxChannel(index, snmp.getChannel()));
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatTxInterval(index, snmp.getInterval()));
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatDeliveryStart(index, snmp.getDeliverystart()));
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatDeliveryStop(index, snmp.getDeliverystop()));
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatPayload(index, payload));
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatEnable(index, snmp.getEnable()));
+ if (verb == ServiceRequest.OdeInternal.RequestVerb.POST) {
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatStatus(index, snmp.getStatus()));
}
-
- private static ScopedPDU createPDUWithFourDot1Protocol(SNMP snmp, String payload, int index, RequestVerb verb) throws ParseException {
- //////////////////////////////
- // - OID examples - //
- //////////////////////////////
- // rsuSRMStatus.3 = 4
- // --> 1.4.1.11.3 = 4
- // rsuSRMTxChannel.3 = 3
- // --> 1.4.1.5.3 = 178
- // rsuSRMTxMode.3 = 1
- // --> 1.4.1.4.3 = 1
- // rsuSRMPsid.3 x "8003"
- // --> 1.4.1.2.3 x "8003"
- // rsuSRMDsrcMsgId.3 = 31
- // --> 1.4.1.3.3 = 31
- // rsuSRMTxInterval.3 = 10
- // --> 1.4.1.6.3 = 10
- // rsuSRMDeliveryStart.3 x "07e7051f0c000000"
- // --> 1.4.1.7.3 = "07e7051f0c000000"
- // rsuSRMDeliveryStop.3 x "07e7060f0c000000"
- // --> 1.4.1.8.3 = "07e7060f0c000000"
- // rsuSRMPayload.3 x "001f6020100000000000de8f834082729de80d80734d37862d2187864fc2099f1f4028407e53bd01b00e69a6f0c5a409c46c3c300118e69a26fa77a0104b8e69a2e86779e21981414e39a68fd29de697d804fb38e69a50e27796151013d81080020290"
- // --> 1.4.1.9.3 = "001f6020100000000000de8f834082729de80d80734d37862d2187864fc2099f1f4028407e53bd01b00e69a6f0c5a409c46c3c300118e69a26fa77a0104b8e69a2e86779e21981414e39a68fd29de697d804fb38e69a50e27796151013d81080020290"
- // rsuSRMEnable.3 = 1
- // --> 1.4.1.10.3 = 1
- //////////////////////////////
-
- VariableBinding rsuSRMPsid = SnmpFourDot1Protocol.getVbRsuSrmPsid(index, snmp.getRsuid());
- VariableBinding rsuSRMDsrcMsgId = SnmpFourDot1Protocol.getVbRsuSrmDsrcMsgId(index, snmp.getMsgid());
- VariableBinding rsuSRMTxMode = SnmpFourDot1Protocol.getVbRsuSrmTxMode(index, snmp.getMode());
- VariableBinding rsuSRMTxChannel = SnmpFourDot1Protocol.getVbRsuSrmTxChannel(index, snmp.getChannel());
- VariableBinding rsuSRMTxInterval = SnmpFourDot1Protocol.getVbRsuSrmTxInterval(index, snmp.getInterval());
- VariableBinding rsuSRMDeliveryStart = SnmpFourDot1Protocol.getVbRsuSrmDeliveryStart(index, snmp.getDeliverystart());
- VariableBinding rsuSRMDeliveryStop = SnmpFourDot1Protocol.getVbRsuSrmDeliveryStop(index, snmp.getDeliverystop());
- VariableBinding rsuSRMPayload = SnmpFourDot1Protocol.getVbRsuSrmPayload(index, payload);
- VariableBinding rsuSRMEnable = SnmpFourDot1Protocol.getVbRsuSrmEnable(index, snmp.getEnable());
- VariableBinding rsuSRMStatus = SnmpFourDot1Protocol.getVbRsuSrmStatus(index, snmp.getStatus());
-
- ScopedPDU pdu = new ScopedPDU();
- pdu.add(rsuSRMPsid);
- pdu.add(rsuSRMDsrcMsgId);
- pdu.add(rsuSRMTxMode);
- pdu.add(rsuSRMTxChannel);
- pdu.add(rsuSRMTxInterval);
- pdu.add(rsuSRMDeliveryStart);
- pdu.add(rsuSRMDeliveryStop);
- pdu.add(rsuSRMPayload);
- pdu.add(rsuSRMEnable);
- if (verb == ServiceRequest.OdeInternal.RequestVerb.POST) {
- pdu.add(rsuSRMStatus);
- }
- pdu.setType(PDU.SET);
-
- return pdu;
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatPriority(index));
+ if (dataSigningEnabledRSU) {
+ // set options to 0x00 to tell RSU to broadcast message without signing or attaching a 1609.2 header
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatOptions(index, 0x00));
+ } else {
+ // set options to 0x80 to tell RSU to sign & attach a 1609.2 header before broadcasting
+ pdu.add(SnmpNTCIP1218Protocol.getVbRsuMsgRepeatOptions(index, 0x80));
}
+ pdu.setType(PDU.SET);
- private static ScopedPDU createPDUWithNTCIP1218Protocol(SNMP snmp, String payload, int index, RequestVerb verb, boolean dataSigningEnabledRSU) throws ParseException {
- //////////////////////////////
- // - OID examples - //
- //////////////////////////////
- // rsuMsgRepeatPsid.3 x "8003"
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.2.3 x "8003"
- // rsuMsgRepeatTxChannel.3 = 3
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.3.3 = 183
- // rsuMsgRepeatTxInterval.3 = 10
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.4.3 = 10
- // rsuMsgRepeatDeliveryStart.3 x "07e7051f0c000000"
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.5.3 = "07e7051f0c000000"
- // rsuMsgRepeatDeliveryStop.3 x "07e7060f0c000000"
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.6.3 = "07e7060f0c000000"
- // rsuMsgRepeatPayload.3 x "001f6020100000000000de8f834082729de80d80734d37862d2187864fc2099f1f4028407e53bd01b00e69a6f0c5a409c46c3c300118e69a26fa77a0104b8e69a2e86779e21981414e39a68fd29de697d804fb38e69a50e27796151013d81080020290"
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.7.3 = "001f6020100000000000de8f834082729de80d80734d37862d2187864fc2099f1f4028407e53bd01b00e69a6f0c5a409c46c3c300118e69a26fa77a0104b8e69a2e86779e21981414e39a68fd29de697d804fb38e69a50e27796151013d81080020290"
- // rsuMsgRepeatEnable.3 = 1
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.8.3 = 1
- // rsuMsgRepeatStatus.3 = 4
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.9.3 = 4
- // rsuMsgRepeatPriority.3 = 6
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.10.3 = 6
- // rsuMsgRepeatOptions.3 = "00"
- // --> 1.3.6.1.4.1.1206.4.2.18.3.2.1.11.3 = "00"
- //////////////////////////////
-
- VariableBinding rsuMsgRepeatPsid = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatPsid(index, snmp.getRsuid());
- // note: dsrc_msg_id is not in NTCIP 1218
- // note: tx_mode is not in NTCIP 1218
- VariableBinding rsuMsgRepeatTxChannel = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatTxChannel(index, snmp.getChannel());
- VariableBinding rsuMsgRepeatTxInterval = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatTxInterval(index, snmp.getInterval());
- VariableBinding rsuMsgRepeatDeliveryStart = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatDeliveryStart(index, snmp.getDeliverystart());
- VariableBinding rsuMsgRepeatDeliveryStop = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatDeliveryStop(index, snmp.getDeliverystop());
- VariableBinding rsuMsgRepeatPayload = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatPayload(index, payload);
- VariableBinding rsuMsgRepeatEnable = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatEnable(index, snmp.getEnable());
- VariableBinding rsuMsgRepeatStatus = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatStatus(index, snmp.getStatus());
- VariableBinding rsuMsgRepeatPriority = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatPriority(index);
- VariableBinding rsuMsgRepeatOptions;
- if (dataSigningEnabledRSU) {
- // set options to 0x00 to tell RSU to broadcast message without signing or attaching a 1609.2 header
- rsuMsgRepeatOptions = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatOptions(index, 0x00);
- } else {
- // set options to 0x80 to tell RSU to sign & attach a 1609.2 header before broadcasting
- rsuMsgRepeatOptions = SnmpNTCIP1218Protocol.getVbRsuMsgRepeatOptions(index, 0x80);
- }
-
- ScopedPDU pdu = new ScopedPDU();
- pdu.add(rsuMsgRepeatPsid);
- pdu.add(rsuMsgRepeatTxChannel);
- pdu.add(rsuMsgRepeatTxInterval);
- pdu.add(rsuMsgRepeatDeliveryStart);
- pdu.add(rsuMsgRepeatDeliveryStop);
- pdu.add(rsuMsgRepeatPayload);
- pdu.add(rsuMsgRepeatEnable);
- if (verb == ServiceRequest.OdeInternal.RequestVerb.POST) {
- pdu.add(rsuMsgRepeatStatus);
- }
- pdu.add(rsuMsgRepeatPriority);
- pdu.add(rsuMsgRepeatOptions);
- pdu.setType(PDU.SET);
-
- return pdu;
- }
+ return pdu;
+ }
}
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java
index 4e26744fd..bdf485070 100644
--- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java
+++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/UdpHexDecoder.java
@@ -142,14 +142,14 @@ public static String buildJsonSpatFromPacket(DatagramPacket packet)
}
/**
- * Converts the data from the given {@link DatagramPacket} into a JSON string representing a TIM
- * message. It extracts metadata and payload, then structures them into a JSON format.
+ * Converts the data from the given {@link DatagramPacket} into a TIM
+ * message. It extracts metadata and payload, then structures them into an {@link OdeAsn1Data} object
*
* @param packet the DatagramPacket containing the TIM data
- * @return a JSON string representing the TIM message
+ * @return an {@link OdeAsn1Data} object representing the TIM message
* @throws InvalidPayloadException if the payload extraction fails
*/
- public static String buildJsonTimFromPacket(DatagramPacket packet)
+ public static OdeAsn1Data buildTimFromPacket(DatagramPacket packet)
throws InvalidPayloadException {
String senderIp = packet.getAddress().getHostAddress();
int senderPort = packet.getPort();
@@ -166,7 +166,7 @@ public static String buildJsonTimFromPacket(DatagramPacket packet)
timMetadata.setRecordType(RecordType.timMsg);
timMetadata.setRecordGeneratedBy(GeneratedBy.RSU);
timMetadata.setSecurityResultCode(SecurityResultCode.success);
- return JsonUtils.toJson(new OdeAsn1Data(timMetadata, timPayload), false);
+ return new OdeAsn1Data(timMetadata, timPayload);
}
/**
diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java
index f2810860d..c56d2a858 100644
--- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java
+++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/generic/GenericReceiver.java
@@ -11,6 +11,7 @@
import us.dot.its.jpo.ode.udp.UdpHexDecoder;
import us.dot.its.jpo.ode.udp.controller.UDPReceiverProperties.ReceiverProperties;
import us.dot.its.jpo.ode.uper.UperUtil;
+import us.dot.its.jpo.ode.util.JsonUtils;
/**
* GenericReceiver is a class that listens for UDP packets and processes them based on the
@@ -102,9 +103,13 @@ private void routeMessageByMessageType(
}
}
case "TIM" -> {
- String timJson = UdpHexDecoder.buildJsonTimFromPacket(packet);
+ var tim = UdpHexDecoder.buildTimFromPacket(packet);
+ var timJson = JsonUtils.toJson(tim, false);
if (timJson != null) {
- publisher.send(rawEncodedJsonTopics.getTim(), timJson);
+ // We need to include the serialID as the key when publishing TIMs. Otherwise, the
+ // OdeTimJsonTopology won't work as intended.
+ publisher.send(rawEncodedJsonTopics.getTim(), tim.getMetadata().getSerialId().toString(),
+ timJson);
}
}
case "BSM" -> {
diff --git a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java
index 26dcddebb..85a1033cc 100644
--- a/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java
+++ b/jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/udp/tim/TimReceiver.java
@@ -52,9 +52,10 @@ public void run() {
socket.receive(packet);
if (packet.getLength() > 0) {
- String timJson = UdpHexDecoder.buildJsonTimFromPacket(packet);
+ var tim = UdpHexDecoder.buildTimFromPacket(packet);
+ var timJson = tim.toJson();
if (timJson != null) {
- timPublisher.send(publishTopic, timJson);
+ timPublisher.send(publishTopic, tim.getMetadata().getSerialId().toString(), timJson);
}
}
} catch (InvalidPayloadException e) {
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/OdeTimJsonTopologyTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/OdeTimJsonTopologyTest.java
deleted file mode 100644
index 32017ef36..000000000
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/OdeTimJsonTopologyTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package us.dot.its.jpo.ode;
-
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import us.dot.its.jpo.ode.kafka.OdeKafkaProperties;
-
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@ExtendWith(SpringExtension.class)
-@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
-@EnableConfigurationProperties(value = OdeKafkaProperties.class)
-class OdeTimJsonTopologyTest {
-
- @Autowired
- private OdeKafkaProperties odeKafkaProperties;
-
- @Value("${ode.kafka.topics.json.tim}")
- private String timTopic;
-
- private OdeTimJsonTopology odeTimJsonTopology;
-
- @BeforeEach
- void setUp() throws SecurityException, IllegalArgumentException {
- odeTimJsonTopology = new OdeTimJsonTopology(odeKafkaProperties, timTopic);
- Awaitility.setDefaultTimeout(250, java.util.concurrent.TimeUnit.MILLISECONDS);
- }
-
- @Test
- void testStop() {
- odeTimJsonTopology.stop();
- Awaitility.await().untilAsserted(() -> assertFalse(odeTimJsonTopology.isRunning()));
- }
-
- @Test
- void testIsRunning() {
- Awaitility.await().untilAsserted(() -> assertTrue(odeTimJsonTopology.isRunning()));
- }
-}
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/TestKafkaStreamsConfig.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/TestKafkaStreamsConfig.java
new file mode 100644
index 000000000..6c6e5d8a1
--- /dev/null
+++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/TestKafkaStreamsConfig.java
@@ -0,0 +1,36 @@
+package us.dot.its.jpo.ode.kafka;
+
+import org.awaitility.Awaitility;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import us.dot.its.jpo.ode.OdeTimJsonTopology;
+import us.dot.its.jpo.ode.test.utilities.EmbeddedKafkaHolder;
+
+/**
+ * TestKafkaStreamsConfig is a test configuration class that provides a Kafka Streams topology
+ * for testing purposes. It utilizes an embedded Kafka broker to facilitate the testing of TIM
+ * (Traveler Information Message) JSON data processing.
+ */
+@TestConfiguration
+public class TestKafkaStreamsConfig {
+
+ /**
+ * Creates and initializes an instance of OdeTimJsonTopology for processing TIM (Traveler Information Message) JSON data.
+ * This method adds the specified Kafka topic to the embedded Kafka broker, creates the topology,
+ * and ensures it is in a running state before returning.
+ *
+ * @param odeKafkaProperties the configuration properties for Kafka
+ * @param timTopic the name of the Kafka topic used for consuming TIM JSON data.
+ *
+ * @return the initialized instance of OdeTimJsonTopology.
+ */
+ @Bean
+ public OdeTimJsonTopology odeTimJsonTopology(OdeKafkaProperties odeKafkaProperties,
+ @Value("${ode.kafka.topics.json.tim}") String timTopic) {
+ EmbeddedKafkaHolder.addTopics(timTopic);
+ var topology = new OdeTimJsonTopology(odeKafkaProperties, timTopic);
+ Awaitility.await().until(topology::isRunning);
+ return topology;
+ }
+}
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/listeners/asn1/RawEncodedPSMJsonRouterTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/listeners/asn1/RawEncodedPSMJsonRouterTest.java
index f10f4604e..bb622a306 100644
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/listeners/asn1/RawEncodedPSMJsonRouterTest.java
+++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/listeners/asn1/RawEncodedPSMJsonRouterTest.java
@@ -28,6 +28,7 @@
@SpringBootTest(
classes = {
+ OdeKafkaProperties.class,
KafkaProducerConfig.class,
KafkaConsumerConfig.class,
KafkaProperties.class,
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/topics/SDXDepositorTopicsTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/topics/SDXDepositorTopicsTest.java
deleted file mode 100644
index bc700a7dd..000000000
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/kafka/topics/SDXDepositorTopicsTest.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package us.dot.its.jpo.ode.kafka.topics;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
-import org.springframework.test.context.ContextConfiguration;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-@ExtendWith(SpringExtension.class)
-@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
-@EnableConfigurationProperties(value = SDXDepositorTopics.class)
-class SDXDepositorTopicsTest {
-
- @Autowired
- SDXDepositorTopics sdxDepositorTopics;
-
- @Test
- void getInput() {
- assertEquals("topic.SDWDepositorInput", sdxDepositorTopics.getInput());
- }
-}
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/rsu/RsuDepositorTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/rsu/RsuDepositorTest.java
index a5451376f..41c9686b4 100644
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/rsu/RsuDepositorTest.java
+++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/rsu/RsuDepositorTest.java
@@ -1,4 +1,4 @@
-/*******************************************************************************
+/*=============================================================================
* Copyright 2020 572682
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
@@ -16,6 +16,8 @@
package us.dot.its.jpo.ode.rsu;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
@@ -26,36 +28,23 @@
import us.dot.its.jpo.ode.model.OdeTravelerInputData;
import us.dot.its.jpo.ode.security.SecurityServicesProperties;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-
@ExtendWith(SpringExtension.class)
@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class)
@EnableConfigurationProperties(value = {RsuProperties.class, SecurityServicesProperties.class})
class RsuDepositorTest {
- @Autowired
- RsuProperties rsuProperties;
-
- @Autowired
- SecurityServicesProperties securityServicesProperties;
-
-
- @Test
- void testShutdown() {
- RsuDepositor testRsuDepositor = new RsuDepositor(rsuProperties, securityServicesProperties.getIsRsuSigningEnabled());
- testRsuDepositor.shutdown();
- assertFalse(testRsuDepositor.isRunning());
- assertFalse(testRsuDepositor.isAlive());
- }
+ @Autowired
+ RsuProperties rsuProperties;
+ @Autowired
+ SecurityServicesProperties securityServicesProperties;
- @Test
- void testDeposit() {
- RsuDepositor testRsuDepositor = new RsuDepositor(rsuProperties, securityServicesProperties.getIsRsuSigningEnabled());
- OdeTravelerInputData mockOdeTravelerInputData = new OdeTravelerInputData();
+ @Test
+ void testDeposit() {
+ RsuDepositor testRsuDepositor = new RsuDepositor(rsuProperties, securityServicesProperties.getIsRsuSigningEnabled());
+ OdeTravelerInputData mockOdeTravelerInputData = new OdeTravelerInputData();
- testRsuDepositor.deposit(mockOdeTravelerInputData.getRequest(), "message");
- assertEquals(1, testRsuDepositor.getDepositorEntries().size());
- }
+ testRsuDepositor.deposit(mockOdeTravelerInputData.getRequest(), "message");
+ assertEquals(1, testRsuDepositor.getDepositorEntries().size());
+ }
}
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/security/SecurityServicesClientTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/security/SecurityServicesClientTest.java
new file mode 100644
index 000000000..ea01b0ea3
--- /dev/null
+++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/security/SecurityServicesClientTest.java
@@ -0,0 +1,107 @@
+package us.dot.its.jpo.ode.security;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo;
+import static org.springframework.test.web.client.response.MockRestResponseCreators.withServerError;
+import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.web.client.ExpectedCount;
+import org.springframework.test.web.client.MockRestServiceServer;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+import us.dot.its.jpo.ode.config.SerializationConfig;
+import us.dot.its.jpo.ode.http.WebClientConfig;
+import us.dot.its.jpo.ode.security.models.SignatureRequestModel;
+import us.dot.its.jpo.ode.security.models.SignatureResultModel;
+
+@ExtendWith(SpringExtension.class)
+@SpringBootTest(
+ classes = {
+ SerializationConfig.class,
+ SecurityServicesClient.class,
+ SecurityServicesProperties.class,
+ WebClientConfig.class,
+ }
+)
+@EnableConfigurationProperties
+class SecurityServicesClientTest {
+
+ @Autowired
+ private RestTemplate restTemplate;
+ @Autowired
+ private SecurityServicesClient securityServicesClient;
+ @Autowired
+ private SecurityServicesProperties securityServicesProperties;
+
+ private MockRestServiceServer mockServer;
+ private final Clock clock = Clock.fixed(Instant.parse("2024-12-26T23:53:21.120Z"), ZoneId.of("UTC"));
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @BeforeEach
+ void beforeEach() {
+ mockServer = MockRestServiceServer.createServer(restTemplate);
+ }
+
+ @Test
+ void testSignMessage_WithMockServerSuccessfulResponse() throws JsonProcessingException {
+ // Arrange
+ String message = "TestMessage";
+ SignatureResultModel expectedResult = new SignatureResultModel();
+ expectedResult.getResult().setMessageSigned("signed message<%s>".formatted(message));
+ expectedResult.getResult().setMessageExpiry(clock.instant().getEpochSecond());
+
+ SignatureRequestModel signatureRequestModel = new SignatureRequestModel();
+ signatureRequestModel.setMessage(message);
+ var expiryTimeInSeconds = (int) clock.instant().plusSeconds(3600).getEpochSecond();
+ signatureRequestModel.setSigValidityOverride(expiryTimeInSeconds);
+
+ mockServer.expect(ExpectedCount.once(), requestTo(securityServicesProperties.getSignatureEndpoint()))
+ .andRespond(withSuccess(objectMapper.writeValueAsString(expectedResult), MediaType.APPLICATION_JSON));
+
+ SignatureResultModel result = securityServicesClient.signMessage(message, expiryTimeInSeconds);
+ assertEquals(expectedResult, result);
+ }
+
+ @Test
+ void testSignMessage_WithNullResponse() {
+ // Arrange
+ String message = "NullResponseTest";
+ var expiryTimeInSeconds = (int) clock.instant().plusSeconds(3600).getEpochSecond();
+
+ mockServer.expect(ExpectedCount.once(), requestTo(securityServicesProperties.getSignatureEndpoint()))
+ .andRespond(withSuccess("", MediaType.APPLICATION_JSON));
+
+ // Act
+ SignatureResultModel result = securityServicesClient.signMessage(message, expiryTimeInSeconds);
+
+ // Assert
+ assertNull(result);
+ }
+
+ @Test
+ void testSignMessage_WithErrorResponse() {
+ String message = "ErrorResponseTest";
+ var expiryTimeInSeconds = (int) clock.instant().plusSeconds(3600).getEpochSecond();
+
+ mockServer.expect(ExpectedCount.once(), requestTo(securityServicesProperties.getSignatureEndpoint()))
+ .andRespond(withServerError());
+
+ assertThrows(RestClientException.class, () -> securityServicesClient.signMessage(message, expiryTimeInSeconds));
+ }
+}
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/Asn1CommandManagerTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/Asn1CommandManagerTest.java
deleted file mode 100644
index 46474407d..000000000
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/Asn1CommandManagerTest.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*******************************************************************************
- * Copyright 2018 572682
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy
- * of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- ******************************************************************************/
-package us.dot.its.jpo.ode.services.asn1;
-
-import java.io.IOException;
-import java.text.ParseException;
-
-import org.json.JSONObject;
-import org.junit.jupiter.api.Test;
-
-import mockit.Capturing;
-import mockit.Injectable;
-import mockit.Mocked;
-import mockit.Tested;
-import us.dot.its.jpo.ode.kafka.OdeKafkaProperties;
-import us.dot.its.jpo.ode.kafka.topics.SDXDepositorTopics;
-import us.dot.its.jpo.ode.model.OdeTravelerInputData;
-import us.dot.its.jpo.ode.rsu.RsuProperties;
-import us.dot.its.jpo.ode.security.SecurityServicesProperties;
-import us.dot.its.jpo.ode.services.asn1.Asn1CommandManager.Asn1CommandManagerException;
-import us.dot.its.jpo.ode.snmp.SnmpSession;
-import us.dot.its.jpo.ode.wrapper.MessageProducer;
-
-public class Asn1CommandManagerTest {
-
- @Tested
- Asn1CommandManager testAsn1CommandManager;
-
- @Injectable
- OdeKafkaProperties injectableOdeKafkaProperties;
-
- @Injectable
- SDXDepositorTopics injectableSDXDepositorTopics;
-
- @Injectable
- RsuProperties injectableRsuProperties;
-
- @Injectable
- SecurityServicesProperties injectableSecurityServicesProperties;
-
- @Capturing
- MessageProducer capturingMessageProducer;
- @Capturing
- SnmpSession capturingSnmpSession;
-
- @Injectable
- OdeTravelerInputData injectableOdeTravelerInputData;
-
- @Mocked
- MessageProducer mockMessageProducer;
-
- @Test
- public void testPackageSignedTimIntoAsd() {
- testAsn1CommandManager.packageSignedTimIntoAsd(injectableOdeTravelerInputData.getRequest(), "message");
- }
-
- @Test
- public void depositToSDWJsonShouldCallMessageProducer() throws Asn1CommandManagerException {
- JSONObject deposit = new JSONObject();
- deposit.put("estimatedRemovalDate", "2023-11-04T17:47:11-05:00");
- deposit.put("encodedMsg", "message");
-
- testAsn1CommandManager.depositToSdw(deposit.toString());
- }
-
- @Test
- public void depositToSDWShouldCallMessageProducer() throws Asn1CommandManagerException {
- testAsn1CommandManager.depositToSdw("message");
- }
-
- @Test
- public void testSendToRsus(@Mocked OdeTravelerInputData mockOdeTravelerInputData)
- throws IOException, ParseException {
-
- testAsn1CommandManager.sendToRsus(mockOdeTravelerInputData.getRequest(), "message");
- }
-
- @Test
- public void testSendToRsusSnmpException(@Mocked OdeTravelerInputData mockOdeTravelerInputData)
- throws IOException, ParseException {
-
- testAsn1CommandManager.sendToRsus(mockOdeTravelerInputData.getRequest(), "message");
- }
-
-}
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/Asn1EncodedDataRouterTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/Asn1EncodedDataRouterTest.java
index d18fcaba0..f1fa2a6ec 100644
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/Asn1EncodedDataRouterTest.java
+++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/services/asn1/Asn1EncodedDataRouterTest.java
@@ -1,163 +1,396 @@
-/*******************************************************************************
+/*============================================================================
* Copyright 2018 572682
- *
+ *
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
******************************************************************************/
+
package us.dot.its.jpo.ode.services.asn1;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.junit.jupiter.api.Disabled;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.UUID;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Profile;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.kafka.listener.MessageListener;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.utils.ContainerTestUtils;
+import org.springframework.kafka.test.utils.KafkaTestUtils;
+import org.springframework.stereotype.Service;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.ActiveProfiles;
+import org.springframework.web.client.RestClientException;
+import org.springframework.web.client.RestTemplate;
+import us.dot.its.jpo.ode.OdeTimJsonTopology;
+import us.dot.its.jpo.ode.config.SerializationConfig;
+import us.dot.its.jpo.ode.http.WebClientConfig;
+import us.dot.its.jpo.ode.kafka.OdeKafkaProperties;
+import us.dot.its.jpo.ode.kafka.TestKafkaStreamsConfig;
+import us.dot.its.jpo.ode.kafka.producer.KafkaProducerConfig;
+import us.dot.its.jpo.ode.kafka.topics.Asn1CoderTopics;
+import us.dot.its.jpo.ode.kafka.topics.JsonTopics;
+import us.dot.its.jpo.ode.rsu.RsuDepositor;
+import us.dot.its.jpo.ode.rsu.RsuProperties;
+import us.dot.its.jpo.ode.security.SecurityServicesClient;
+import us.dot.its.jpo.ode.security.SecurityServicesProperties;
+import us.dot.its.jpo.ode.security.models.SignatureResultModel;
+import us.dot.its.jpo.ode.test.utilities.EmbeddedKafkaHolder;
+
+@Slf4j
+@SpringBootTest(
+ properties = {
+ "ode.security-services.is-rsu-signing-enabled=false",
+ "ode.security-services.is-sdw-signing-enabled=false",
+ "ode.kafka.topics.json.tim-cert-expiration=topic.Asn1EncodedDataRouterTestTimCertExpiration",
+ "ode.kafka.topics.json.tim-tmc-filtered=topic.Asn1EncodedDataRouterTestTimTmcFiltered",
+ "ode.kafka.topics.asn1.encoder-input=topic.Asn1EncodedDataRouterTestEncoderInput",
+ "ode.kafka.topics.asn1.encoder-output=topic.Asn1EncodedDataRouterTestEncoderOutput",
+ "ode.kafka.topics.sdx-depositor.input=topic.Asn1EncodedDataRouterTestSDXDepositor"
+ },
+ classes = {
+ OdeKafkaProperties.class,
+ KafkaProducerConfig.class,
+ SerializationConfig.class,
+ KafkaProperties.class,
+ TestKafkaStreamsConfig.class,
+ Asn1CoderTopics.class,
+ JsonTopics.class,
+ SecurityServicesProperties.class,
+ RsuProperties.class,
+ Asn1EncodedDataRouterTest.MockSecurityServicesClient.class,
+ WebClientConfig.class
+ }
+)
+@EnableConfigurationProperties
+@DirtiesContext
+@ActiveProfiles("test")
+class Asn1EncodedDataRouterTest {
+
+ private final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaHolder.getEmbeddedKafka();
+ @Autowired
+ Asn1CoderTopics asn1CoderTopics;
+ @Autowired
+ JsonTopics jsonTopics;
+ @Autowired
+ SecurityServicesProperties securityServicesProperties;
+ @Autowired
+ KafkaTemplate kafkaTemplate;
+ @Autowired
+ OdeTimJsonTopology odeTimJsonTopology;
+ @Autowired
+ ObjectMapper objectMapper;
+ @Autowired
+ MockSecurityServicesClient secServicesClient;
+
+ @Value("${ode.kafka.topics.sdx-depositor.input}")
+ String sdxDepositorTopic;
+
+ @Mock
+ RsuDepositor mockRsuDepositor;
+ @Autowired
+ private XmlMapper xmlMapper;
+
+ private static String stripGeneratedFields(String expectedEncoderInput) {
+ return expectedEncoderInput
+ .replaceAll(".*?", "")
+ .replaceAll(".*?", "")
+ .replaceAll(".*?", "")
+ .replaceAll(".*?", "");
+ }
+
+ private static String replaceStreamId(String input, String streamId) {
+ return input.replaceAll(".*?", "" + streamId + "");
+ }
+
+ private static String loadResourceString(String name)
+ throws IOException {
+ String resourcePackagePath = "us/dot/its/jpo/ode/services/asn1/";
+ InputStream inputStream;
+ inputStream = Asn1EncodedDataRouterTest.class.getClassLoader()
+ .getResourceAsStream(resourcePackagePath + name);
+ assert inputStream != null;
+ return new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
+ }
+
+ @Test
+ void processSignedMessage() throws IOException {
+ String[] topicsForConsumption = {
+ asn1CoderTopics.getEncoderInput(),
+ jsonTopics.getTimTmcFiltered(),
+ sdxDepositorTopic
+ };
+ EmbeddedKafkaHolder.addTopics(topicsForConsumption);
+
+ securityServicesProperties.setIsSdwSigningEnabled(true);
+ Asn1EncodedDataRouter encoderRouter = new Asn1EncodedDataRouter(
+ asn1CoderTopics,
+ jsonTopics,
+ securityServicesProperties,
+ odeTimJsonTopology,
+ mockRsuDepositor,
+ secServicesClient,
+ kafkaTemplate, sdxDepositorTopic,
+ objectMapper,
+ xmlMapper);
+
+ final var container = setupListenerContainer(encoderRouter,
+ "processSignedMessage"
+ );
+
+ var odeJsonTim = loadResourceString("expected-asn1-encoded-router-tim-json.json");
+
+ // send to tim topic so that the OdeTimJsonTopology k-table has the correct record to return
+ var streamId = "266e6742-40fb-4c9e-a6b0-72ed2dddddfe";
+ kafkaTemplate.send(jsonTopics.getTim(), streamId, odeJsonTim);
+
+ var input = loadResourceString("asn1-encoder-output-unsigned-tim.xml");
+ var completableFuture = kafkaTemplate.send(asn1CoderTopics.getEncoderOutput(), input);
+ Awaitility.await().until(completableFuture::isDone);
+
+ var testConsumer =
+ createTestConsumer("processSignedMessage");
+ embeddedKafka.consumeFromEmbeddedTopics(testConsumer, topicsForConsumption);
+
+ var expected = loadResourceString("expected-asn1-encoded-router-sdx-deposit.json");
+
+ var records = KafkaTestUtils.getRecords(testConsumer);
+ var sdxDepositorRecord = records.records(sdxDepositorTopic);
+ var foundValidRecord = false;
+ for (var consumerRecord : sdxDepositorRecord) {
+ if (consumerRecord.value().equals(expected)) {
+ foundValidRecord = true;
+ }
+ }
+ assertTrue(foundValidRecord);
+ container.stop();
+ log.debug("processSignedMessage container stopped");
+ }
+
+ @Test
+ void processUnsignedMessage() throws IOException {
+ String[] topicsForConsumption = {
+ asn1CoderTopics.getEncoderInput(),
+ jsonTopics.getTimCertExpiration(),
+ jsonTopics.getTimTmcFiltered()
+ };
+ EmbeddedKafkaHolder.addTopics(topicsForConsumption);
+
+ securityServicesProperties.setIsSdwSigningEnabled(true);
+ securityServicesProperties.setIsRsuSigningEnabled(true);
+ Asn1EncodedDataRouter encoderRouter = new Asn1EncodedDataRouter(
+ asn1CoderTopics,
+ jsonTopics,
+ securityServicesProperties,
+ odeTimJsonTopology,
+ mockRsuDepositor,
+ secServicesClient,
+ kafkaTemplate, sdxDepositorTopic,
+ objectMapper,
+ xmlMapper);
+
+ final var container = setupListenerContainer(encoderRouter, "processUnsignedMessage");
+
+ var odeJsonTim = loadResourceString("expected-asn1-encoded-router-tim-json.json");
+ // send to tim topic so that the OdeTimJsonTopology k-table has the correct record to return
+ var streamId = UUID.randomUUID().toString();
+ odeJsonTim = odeJsonTim.replaceAll("266e6742-40fb-4c9e-a6b0-72ed2dddddfe", streamId);
+ var topologySendFuture = kafkaTemplate.send(jsonTopics.getTim(), streamId, odeJsonTim);
+ Awaitility.await().until(topologySendFuture::isDone);
+
+ var input = loadResourceString("asn1-encoder-output-unsigned-tim-no-advisory-data.xml");
+ input = replaceStreamId(input, streamId);
+
+ var completableFuture = kafkaTemplate.send(asn1CoderTopics.getEncoderOutput(), input);
+ Awaitility.await().until(completableFuture::isDone);
+
+ var consumerProps = KafkaTestUtils.consumerProps(
+ "processUnsignedMessage", "false", embeddedKafka);
+ var consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps,
+ new StringDeserializer(), new StringDeserializer());
+
+ var timCertConsumer =
+ consumerFactory.createConsumer("timCertExpiration", "processUnsignedMessage");
+ embeddedKafka.consumeFromAnEmbeddedTopic(timCertConsumer, jsonTopics.getTimCertExpiration());
+ var expectedTimCertExpiry = loadResourceString("expected-tim-cert-expired.json");
+ var timCertExpirationRecord =
+ KafkaTestUtils.getSingleRecord(timCertConsumer, jsonTopics.getTimCertExpiration());
+ assertEquals(expectedTimCertExpiry, timCertExpirationRecord.value());
+
+ var timTmcFilteredConsumer =
+ consumerFactory.createConsumer("timTmcFiltered", "processUnsignedMessage");
+ embeddedKafka.consumeFromAnEmbeddedTopic(timTmcFilteredConsumer,
+ jsonTopics.getTimTmcFiltered());
+ var expectedTimTmcFiltered = loadResourceString("expected-tim-tmc-filtered.json");
+ var records = KafkaTestUtils.getRecords(timTmcFilteredConsumer);
+ expectedTimTmcFiltered =
+ expectedTimTmcFiltered.replaceAll("266e6742-40fb-4c9e-a6b0-72ed2dddddfe", streamId);
+ var foundValidRecord = false;
+ for (var consumerRecord : records.records(jsonTopics.getTimTmcFiltered())) {
+ if (consumerRecord.value().contains(streamId)) {
+ assertEquals(expectedTimTmcFiltered, consumerRecord.value());
+ foundValidRecord = true;
+ }
+ }
+ assertTrue(foundValidRecord);
+
+ var encoderInputConsumer =
+ consumerFactory.createConsumer("encoderInput", "processUnsignedMessage");
+ embeddedKafka.consumeFromAnEmbeddedTopic(encoderInputConsumer,
+ asn1CoderTopics.getEncoderInput());
+ var expectedEncoderInput = loadResourceString("expected-asn1-encoded-router-snmp-deposit.xml");
+ var expectedEncoderInputWithStableFieldsOnly = stripGeneratedFields(expectedEncoderInput);
+ var encoderInputRecords = KafkaTestUtils.getRecords(encoderInputConsumer);
+ for (var consumerRecord : encoderInputRecords.records(asn1CoderTopics.getEncoderInput())) {
+ var encoderInputWithStableFieldsOnly = stripGeneratedFields(consumerRecord.value());
+ assertEquals(expectedEncoderInputWithStableFieldsOnly, encoderInputWithStableFieldsOnly);
+ }
+ container.stop();
+ log.debug("processUnsignedMessage container stopped");
+ }
+
+ @Test
+ void processEncodedTimUnsigned() throws IOException {
+ String[] topicsForConsumption = {
+ asn1CoderTopics.getEncoderInput(),
+ jsonTopics.getTimTmcFiltered()
+ };
+ EmbeddedKafkaHolder.addTopics(topicsForConsumption);
+
+ securityServicesProperties.setIsSdwSigningEnabled(false);
+ securityServicesProperties.setIsRsuSigningEnabled(false);
+ Asn1EncodedDataRouter encoderRouter = new Asn1EncodedDataRouter(
+ asn1CoderTopics,
+ jsonTopics,
+ securityServicesProperties,
+ odeTimJsonTopology,
+ mockRsuDepositor,
+ secServicesClient,
+ kafkaTemplate,
+ sdxDepositorTopic,
+ objectMapper,
+ xmlMapper);
+
+ final var container = setupListenerContainer(encoderRouter, "processEncodedTimUnsigned");
+ var odeJsonTim = loadResourceString("expected-asn1-encoded-router-tim-json.json");
+
+ // send to tim topic so that the OdeTimJsonTopology k-table has the correct record to return
+ var streamId = UUID.randomUUID().toString();
+ odeJsonTim = odeJsonTim.replaceAll("266e6742-40fb-4c9e-a6b0-72ed2dddddfe", streamId);
+ kafkaTemplate.send(jsonTopics.getTim(), streamId, odeJsonTim);
+
+ var input = loadResourceString("asn1-encoder-output-unsigned-tim.xml");
+ input = replaceStreamId(input, streamId);
+ kafkaTemplate.send(asn1CoderTopics.getEncoderOutput(), input);
+
+ var testConsumer = createTestConsumer("processEncodedTimUnsigned");
+ embeddedKafka.consumeFromEmbeddedTopics(testConsumer, topicsForConsumption);
+
+ var expected = loadResourceString("expected-asn1-encoded-router-sdx-deposit.json");
+
+ var records = KafkaTestUtils.getRecords(testConsumer);
+ var sdxDepositorRecord = records
+ .records(sdxDepositorTopic);
+ for (var consumerRecord : sdxDepositorRecord) {
+ if (consumerRecord.value().contains(streamId)) {
+ assertEquals(expected, consumerRecord.value());
+ }
+ }
+
+ var expectedTimTmcFiltered = loadResourceString("expected-tim-tmc-filtered.json");
+ expectedTimTmcFiltered =
+ expectedTimTmcFiltered.replaceAll("266e6742-40fb-4c9e-a6b0-72ed2dddddfe", streamId);
+
+ var foundValidRecord = false;
+ for (var consumerRecord : records.records(jsonTopics.getTimTmcFiltered())) {
+ if (consumerRecord.value().contains(streamId)) {
+ assertEquals(expectedTimTmcFiltered, consumerRecord.value());
+ foundValidRecord = true;
+ }
+ }
+ assertTrue(foundValidRecord);
+ container.stop();
+ log.debug("processEncodedTimUnsigned container stopped");
+ }
+
+ private KafkaMessageListenerContainer setupListenerContainer(
+ Asn1EncodedDataRouter encoderRouter,
+ String containerName) {
+ var consumerProps = KafkaTestUtils.consumerProps(containerName, "false", embeddedKafka);
+ DefaultKafkaConsumerFactory consumerFactory =
+ new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer());
+ ContainerProperties containerProperties = new ContainerProperties(asn1CoderTopics.getEncoderOutput());
+ KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
+ container.setupMessageListener(
+ (MessageListener) consumerRecord -> {
+ try {
+ encoderRouter.listen(consumerRecord);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ );
+ container.setBeanName(containerName);
+ container.start();
+ ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
+ log.debug("{} started", containerName);
+ return container;
+ }
+
+ private Consumer createTestConsumer(String group) {
+ var consumerProps = KafkaTestUtils.consumerProps(
+ group, "false", embeddedKafka);
+ var consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps,
+ new StringDeserializer(), new StringDeserializer());
+ return consumerFactory.createConsumer();
+ }
+
+ @Service
+ @Profile("test")
+ static class MockSecurityServicesClient extends SecurityServicesClient {
+ private static final Clock clock = Clock.fixed(Instant.parse("2024-03-08T16:37:05.414Z"), ZoneId.of("UTC"));
-import mockit.Expectations;
-import mockit.Mocked;
-import mockit.Tested;
-import us.dot.its.jpo.ode.context.AppContext;
-import us.dot.its.jpo.ode.traveler.TimTransmogrifier;
-import us.dot.its.jpo.ode.util.XmlUtils;
-import us.dot.its.jpo.ode.util.XmlUtils.XmlUtilsException;
-
-public class Asn1EncodedDataRouterTest {
-
- @Tested
- Asn1EncodedDataRouter testAsn1EncodedDataRouter;
-
-
-// @Capturing
-// MessageProducer,?> capturingMessageProducer;
-//
-// @Capturing
-// Asn1CommandManager capturingAsn1CommandManager;
-//
-// @Capturing
-// XmlUtils capturingXmlUtils;
-
- @Test @Disabled
- public void testNoRequest(@Mocked JSONObject mockJSONObject) throws XmlUtilsException {
- new Expectations() {{
- XmlUtils.toJSONObject(anyString);
- result = mockJSONObject;
-
- mockJSONObject.has("request");
- result = false;
- }};
- testAsn1EncodedDataRouter.process("stringthing");
- }
-
- @Test @Disabled
- public void testNoRsus(@Mocked JSONObject mockJSONObject) throws XmlUtilsException {
- new Expectations() {{
- XmlUtils.toJSONObject(anyString);
- result = mockJSONObject;
-
- mockJSONObject.has("request");
- result = true;
-
- mockJSONObject.has("rsus");
- result = false;
- }};
- testAsn1EncodedDataRouter.process("stringthing");
- }
-
- @Test @Disabled
- public void testSingleRsu(@Mocked JSONObject mockJSONObject) throws XmlUtilsException {
- try {
- new Expectations() {{
- XmlUtils.toJSONObject(anyString);
- result = mockJSONObject;
-
- mockJSONObject.has("request");
- result = true;
-
- mockJSONObject.has("rsus");
- result = true;
-
- mockJSONObject.get("rsus");
- //result = new JSONObject();
- }};
- } catch (XmlUtilsException e) {
-
- e.printStackTrace();
- } catch (JSONException e) {
-
- e.printStackTrace();
- }
- testAsn1EncodedDataRouter.process("stringthing");
- }
-
- @Test @Disabled
- public void testRsuArray(@Mocked JSONObject mockJSONObject) throws XmlUtilsException {
- try {
- new Expectations() {{
- XmlUtils.toJSONObject(anyString);
- result = mockJSONObject;
-
- mockJSONObject.has("request");
- result = true;
-
- mockJSONObject.has("rsus");
- result = true;
-
- mockJSONObject.get("rsus");
- result = new JSONArray();
- }};
- } catch (XmlUtilsException e) {
-
- e.printStackTrace();
- } catch (JSONException e) {
-
- e.printStackTrace();
- }
- testAsn1EncodedDataRouter.process("stringthing");
- }
-
- @Test @Disabled
- public void testWithASD(@Mocked JSONObject mockJSONObject) throws XmlUtilsException {
- try {
- new Expectations() {{
- XmlUtils.toJSONObject(anyString);
- result = mockJSONObject;
-
- mockJSONObject.getJSONObject(AppContext.METADATA_STRING);
- result = mockJSONObject;
-
- mockJSONObject.has(TimTransmogrifier.REQUEST_STRING);
- result = true;
-
- mockJSONObject.getJSONObject(TimTransmogrifier.REQUEST_STRING);
- result = mockJSONObject;
-
- mockJSONObject.has(TimTransmogrifier.RSUS_STRING);
- result = true;
- times = 2;
-
- mockJSONObject.get(TimTransmogrifier.RSUS_STRING);
- result = mockJSONObject;
- times = 2;
-
- mockJSONObject.has(Asn1CommandManager.ADVISORY_SITUATION_DATA_STRING);
- result = true;
- }};
- } catch (XmlUtilsException e) {
-
- e.printStackTrace();
- } catch (JSONException e) {
-
- e.printStackTrace();
- }
- testAsn1EncodedDataRouter.process("stringthing");
- }
+ public MockSecurityServicesClient(RestTemplate restTemplate, SecurityServicesProperties securityServicesProperties) {
+ super(restTemplate, securityServicesProperties);
+ }
+ @Override
+ public SignatureResultModel signMessage(String message, int sigValidityOverride) throws RestClientException {
+ var signatureResponse = new SignatureResultModel();
+ signatureResponse.getResult().setMessageSigned("<%s>".formatted(message));
+ signatureResponse.getResult().setMessageExpiry(clock.instant().getEpochSecond() + 1000);
+ return signatureResponse;
+ }
+ }
}
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/test/utilities/EmbeddedKafkaHolder.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/test/utilities/EmbeddedKafkaHolder.java
index 95fcc5366..a41184ada 100644
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/test/utilities/EmbeddedKafkaHolder.java
+++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/test/utilities/EmbeddedKafkaHolder.java
@@ -58,7 +58,12 @@ public static EmbeddedKafkaBroker getEmbeddedKafka() {
* @param topics one or more topic names to be added to the embedded Kafka broker
*/
public static void addTopics(String... topics) {
+ var existingTopics = embeddedKafka.getTopics();
for (String topic : topics) {
+ if (existingTopics.contains(topic)) {
+ log.debug("topic {} already exists in embedded kafka broker. Skipping creation", topic);
+ continue;
+ }
NewTopic newTopic = new NewTopic(topic, 1, (short) 1);
try {
embeddedKafka.addTopics(newTopic);
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/generic/GenericReceiverTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/generic/GenericReceiverTest.java
index 854071a6b..2eba1a9db 100644
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/generic/GenericReceiverTest.java
+++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/generic/GenericReceiverTest.java
@@ -10,6 +10,7 @@
import java.time.ZoneOffset;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -93,7 +94,7 @@ void testRun() throws Exception {
udpReceiverProperties.getGeneric().getReceiverPort());
var consumerProps = KafkaTestUtils.consumerProps("GenericReceiverTest", "true", embeddedKafka);
- var cf = new DefaultKafkaConsumerFactory(consumerProps);
+ var cf = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer());
var consumer = cf.createConsumer();
embeddedKafka.consumeFromEmbeddedTopics(consumer, topics);
diff --git a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/tim/TimReceiverTest.java b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/tim/TimReceiverTest.java
index 602cba274..2b34dba04 100644
--- a/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/tim/TimReceiverTest.java
+++ b/jpo-ode-svcs/src/test/java/us/dot/its/jpo/ode/udp/tim/TimReceiverTest.java
@@ -11,6 +11,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.json.JSONObject;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -86,9 +87,9 @@ void testRun() throws Exception {
var consumerProps = KafkaTestUtils.consumerProps(
"TimReceiverTest", "true", embeddedKafka);
- DefaultKafkaConsumerFactory cf =
- new DefaultKafkaConsumerFactory<>(consumerProps);
- Consumer consumer = cf.createConsumer();
+ DefaultKafkaConsumerFactory cf =
+ new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer());
+ Consumer consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, rawEncodedJsonTopics.getTim());
var singleRecord = KafkaTestUtils.getSingleRecord(consumer, rawEncodedJsonTopics.getTim());
diff --git a/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/asn1-encoder-output-unsigned-tim-no-advisory-data.xml b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/asn1-encoder-output-unsigned-tim-no-advisory-data.xml
new file mode 100644
index 000000000..33d95c2a7
--- /dev/null
+++ b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/asn1-encoder-output-unsigned-tim-no-advisory-data.xml
@@ -0,0 +1,71 @@
+
+
+
+ us.dot.its.jpo.ode.model.OdeAsdPayload
+
+ 266e6742-40fb-4c9e-a6b0-72ed2dddddfe
+ 1
+ 0
+ 0
+ 0
+
+ 2024-12-18T16:14:09.690Z
+ 7
+ 297
+ 2024-02-29T14:15:47.000Z
+ TMC
+ false
+ 03B027CF2071328E12
+ 2024-03-08T16:37:05.414Z
+
+
+ 3
+ POST
+
+
+
+
+ 38.98721843900006
+ -104.76767069499999
+
+
+ 38.96666515900006
+ -104.74048299899994
+
+
+ oneday
+ 6B573067
+
+
+
+
+ MessageFrame
+ MessageFrame
+ UPER
+
+
+ Ieee1609Dot2Data
+ Ieee1609Dot2Data
+ COER
+
+
+ UPER
+
+
+
+
+ us.dot.its.jpo.ode.model.OdeHexByteArray
+
+
+
+ 001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600
+
+
+
+
+ 03808188001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600
+
+
+
+
+
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/asn1-encoder-output-unsigned-tim.xml b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/asn1-encoder-output-unsigned-tim.xml
new file mode 100644
index 000000000..11834db3a
--- /dev/null
+++ b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/asn1-encoder-output-unsigned-tim.xml
@@ -0,0 +1,78 @@
+
+
+
+ us.dot.its.jpo.ode.model.OdeAsdPayload
+
+ 266e6742-40fb-4c9e-a6b0-72ed2dddddfe
+ 1
+ 0
+ 0
+ 0
+
+ 2024-12-18T16:14:09.690Z
+ 7
+ 297
+ 2024-02-29T14:15:47.000Z
+ TMC
+ false
+ 03B027CF2071328E12
+ 2024-03-08T16:37:05.414Z
+
+
+ 3
+ POST
+
+
+
+
+ 38.98721843900006
+ -104.76767069499999
+
+
+ 38.96666515900006
+ -104.74048299899994
+
+
+ oneday
+ 6B573067
+
+
+
+
+ MessageFrame
+ MessageFrame
+ UPER
+
+
+ Ieee1609Dot2Data
+ Ieee1609Dot2Data
+ COER
+
+
+ AdvisorySituationData
+ AdvisorySituationData
+ UPER
+
+
+
+
+ us.dot.its.jpo.ode.model.OdeHexByteArray
+
+
+
+ 001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600
+
+
+
+
+ 03808188001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600
+
+
+
+
+ C4400000000B72268E26B57306742670F19C166BC56E099BD80B859B761C3DB913471202000007FC000007FC11807010310003F0108E0229E6E07604F9E40E2651C241EEEBB36060384CE1E6E82CD78604FFFF27E817CC50253400FF2726FC39EB5931BF4E0D4BEDA1356ECD8366F00000000133879BA0B35E18109C4604001088066C2280E802E679B85AC407FB42EAC7306B1DA4EAAF8D0ADF755838741F1B38443BB0B14C1C15B4B2C248ACFE56304002004013DDD766C000
+
+
+
+
+
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-sdx-deposit.json b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-sdx-deposit.json
new file mode 100644
index 000000000..5bcea7230
--- /dev/null
+++ b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-sdx-deposit.json
@@ -0,0 +1 @@
+{"encodedMsg":"C4400000000B72268E26B57306742670F19C166BC56E099BD80B859B761C3DB913471202000007FC000007FC11807010310003F0108E0229E6E07604F9E40E2651C241EEEBB36060384CE1E6E82CD78604FFFF27E817CC50253400FF2726FC39EB5931BF4E0D4BEDA1356ECD8366F00000000133879BA0B35E18109C4604001088066C2280E802E679B85AC407FB42EAC7306B1DA4EAAF8D0ADF755838741F1B38443BB0B14C1C15B4B2C248ACFE56304002004013DDD766C000"}
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-snmp-deposit.xml b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-snmp-deposit.xml
new file mode 100644
index 000000000..ce6323ff7
--- /dev/null
+++ b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-snmp-deposit.xml
@@ -0,0 +1 @@
+us.dot.its.jpo.ode.model.OdeAsdPayloaddb2e2c93-c87a-488c-b54b-e0dfcbb08fca10002024-12-18T23:59:55.081Z80false3POST38.98721843900006-104.7676706949999938.96666515900006-104.74048299899994oneday6B573067AdvisorySituationDataAdvisorySituationDataUPERus.dot.its.jpo.ode.plugin.j2735.DdsAdvisorySituationData156500000000B673BA356B5730672389872184-1047676707389666652-1047404830B673BA3520200031600003160001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-tim-json.json b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-tim-json.json
new file mode 100644
index 000000000..fa0a17775
--- /dev/null
+++ b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-asn1-encoded-router-tim-json.json
@@ -0,0 +1,224 @@
+{
+ "metadata": {
+ "payloadType": "us.dot.its.jpo.ode.model.OdeTimPayload",
+ "serialId": {
+ "streamId": "266e6742-40fb-4c9e-a6b0-72ed2dddddfe",
+ "bundleSize": 1,
+ "bundleId": 1,
+ "recordId": 0,
+ "serialNumber": 1
+ },
+ "odeReceivedAt": "2024-12-18T20:58:49.419Z",
+ "schemaVersion": 7,
+ "maxDurationTime": 297,
+ "recordGeneratedAt": "2024-02-29T14:15:47.000Z",
+ "recordGeneratedBy": "TMC",
+ "sanitized": false,
+ "odePacketID": "03B027CF2071328E12",
+ "odeTimStartDateTime": "2024-03-08T16:37:05.414Z",
+ "request": {
+ "ode": {
+ "version": 3,
+ "verb": "POST"
+ },
+ "sdw": {
+ "serviceRegion": {
+ "nwCorner": {
+ "latitude": 38.98721843900006,
+ "longitude": -104.76767069499999
+ },
+ "seCorner": {
+ "latitude": 38.96666515900006,
+ "longitude": -104.74048299899994
+ }
+ },
+ "ttl": "oneday",
+ "recordId": "6B573067"
+ }
+ }
+ },
+ "payload": {
+ "data": {
+ "msgCnt": 1,
+ "timeStamp": 85815,
+ "packetID": "03B027CF2071328E12",
+ "urlB": "null",
+ "dataFrames": [
+ {
+ "notUsed": 0,
+ "frameType": "advisory",
+ "msgId": {
+ "roadSignID": {
+ "position": {
+ "lat": 389873128,
+ "long": -1047677947
+ },
+ "viewAngle": {
+ "from000-0to022-5degrees": true,
+ "from022-5to045-0degrees": true,
+ "from045-0to067-5degrees": true,
+ "from067-5to090-0degrees": true,
+ "from090-0to112-5degrees": true,
+ "from112-5to135-0degrees": true,
+ "from135-0to157-5degrees": true,
+ "from157-5to180-0degrees": true,
+ "from180-0to202-5degrees": true,
+ "from202-5to225-0degrees": true,
+ "from225-0to247-5degrees": true,
+ "from247-5to270-0degrees": true,
+ "from270-0to292-5degrees": true,
+ "from292-5to315-0degrees": true,
+ "from315-0to337-5degrees": true,
+ "from337-5to360-0degrees": true
+ },
+ "mutcdCode": "warning"
+ }
+ },
+ "startYear": 2024,
+ "startTime": 97477,
+ "durationTime": 297,
+ "priority": 5,
+ "notUsed1": 0,
+ "regions": [
+ {
+ "name": "I_CO-21_SAT_6B573067",
+ "id": {
+ "region": 0,
+ "id": 0
+ },
+ "anchor": {
+ "lat": 389873128,
+ "long": -1047677947
+ },
+ "laneWidth": 5000,
+ "directionality": "both",
+ "closedPath": false,
+ "direction": {
+ "from000-0to022-5degrees": false,
+ "from022-5to045-0degrees": false,
+ "from045-0to067-5degrees": false,
+ "from067-5to090-0degrees": false,
+ "from090-0to112-5degrees": false,
+ "from112-5to135-0degrees": true,
+ "from135-0to157-5degrees": false,
+ "from157-5to180-0degrees": false,
+ "from180-0to202-5degrees": false,
+ "from202-5to225-0degrees": false,
+ "from225-0to247-5degrees": false,
+ "from247-5to270-0degrees": false,
+ "from270-0to292-5degrees": false,
+ "from292-5to315-0degrees": false,
+ "from315-0to337-5degrees": false,
+ "from337-5to360-0degrees": false
+ },
+ "description": {
+ "path": {
+ "scale": 0,
+ "offset": {
+ "ll": {
+ "nodes": [
+ {
+ "delta": {
+ "node-LL1": {
+ "lon": 1240,
+ "lat": -944
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL4": {
+ "lon": 32814,
+ "lat": -24978
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL3": {
+ "lon": 22048,
+ "lat": -16422
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL3": {
+ "lon": 27335,
+ "lat": -20373
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL4": {
+ "lon": 53877,
+ "lat": -41190
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL3": {
+ "lon": 14301,
+ "lat": -10738
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL4": {
+ "lon": 33763,
+ "lat": -25566
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL4": {
+ "lon": 60460,
+ "lat": -46052
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL3": {
+ "lon": 13974,
+ "lat": -10167
+ }
+ }
+ },
+ {
+ "delta": {
+ "node-LL3": {
+ "lon": 13305,
+ "lat": -10047
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ ],
+ "notUsed2": 0,
+ "notUsed3": 0,
+ "content": {
+ "workZone": [
+ {
+ "item": {
+ "itis": 1025
+ }
+ }
+ ]
+ },
+ "url": "null"
+ }
+ ]
+ },
+ "dataType": "us.dot.its.jpo.ode.plugin.j2735.travelerinformation.TravelerInformation"
+ }
+}
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-tim-cert-expired.json b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-tim-cert-expired.json
new file mode 100644
index 000000000..752d77467
--- /dev/null
+++ b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-tim-cert-expired.json
@@ -0,0 +1 @@
+{"packetID":"03B027CF2071328E12","startDateTime":"2024-03-08T16:37:05.414Z","requiredExpirationDate":"2024-03-08T21:34:05.414Z","expirationDate":"2024-03-08T16:53:45.000Z"}
\ No newline at end of file
diff --git a/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-tim-tmc-filtered.json b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-tim-tmc-filtered.json
new file mode 100644
index 000000000..b9345b554
--- /dev/null
+++ b/jpo-ode-svcs/src/test/resources/us/dot/its/jpo/ode/services/asn1/expected-tim-tmc-filtered.json
@@ -0,0 +1 @@
+{"metadata":{"request":{"ode":{"verb":"POST","version":3},"sdw":{"recordId":"6B573067","serviceRegion":{"nwCorner":{"latitude":38.98721843900006,"longitude":-104.76767069499999},"seCorner":{"latitude":38.96666515900006,"longitude":-104.74048299899994}},"ttl":"oneday"}},"recordGeneratedBy":"TMC","schemaVersion":7,"payloadType":"us.dot.its.jpo.ode.model.OdeTimPayload","odePacketID":"03B027CF2071328E12","serialId":{"recordId":0,"serialNumber":1,"streamId":"266e6742-40fb-4c9e-a6b0-72ed2dddddfe","bundleSize":1,"bundleId":1},"sanitized":false,"recordGeneratedAt":"2024-02-29T14:15:47.000Z","asn1":"001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600","maxDurationTime":297,"odeTimStartDateTime":"2024-03-08T16:37:05.414Z","odeReceivedAt":"2024-12-18T20:58:49.419Z"},"payload":{"data":{"timeStamp":85815,"packetID":"03B027CF2071328E12","urlB":"null","dataFrames":[{"durationTime":297,"regions":[{"closedPath":false,"anchor":{"lat":389873128,"long":-1047677947},"name":"I_CO-21_SAT_6B573067","laneWidth":5000,"directionality":"both","description":{"path":{"offset":{"ll":{"nodes":[{"delta":{"node-LL1":{"lon":1240,"lat":-944}}},{"delta":{"node-LL4":{"lon":32814,"lat":-24978}}},{"delta":{"node-LL3":{"lon":22048,"lat":-16422}}},{"delta":{"node-LL3":{"lon":27335,"lat":-20373}}},{"delta":{"node-LL4":{"lon":53877,"lat":-41190}}},{"delta":{"node-LL3":{"lon":14301,"lat":-10738}}},{"delta":{"node-LL4":{"lon":33763,"lat":-25566}}},{"delta":{"node-LL4":{"lon":60460,"lat":-46052}}},{"delta":{"node-LL3":{"lon":13974,"lat":-10167}}},{"delta":{"node-LL3":{"lon":13305,"lat":-10047}}}]}},"scale":0}},"id":{"id":0,"region":0},"direction":{"from315-0to337-5degrees":false,"from202-5to225-0degrees":false,"from067-5to090-0degrees":false,"from270-0to292-5degrees":false,"from247-5to270-0degrees":false,"from112-5to135-0degrees":true,"from292-5to315-0degrees":false,"from180-0to202-5degrees":false,"from022-5to045-0degrees":false,"from045-0to067-5degrees":false,"from157-5to180-0degrees":false,"from000-0to022-5degrees":false,"from135-0to157-5degrees":false,"from225-0to247-5degrees":false,"from337-5to360-0degrees":false,"from090-0to112-5degrees":false}}],"startYear":2024,"notUsed2":0,"msgId":{"roadSignID":{"viewAngle":{"from315-0to337-5degrees":true,"from202-5to225-0degrees":true,"from067-5to090-0degrees":true,"from270-0to292-5degrees":true,"from247-5to270-0degrees":true,"from112-5to135-0degrees":true,"from292-5to315-0degrees":true,"from180-0to202-5degrees":true,"from022-5to045-0degrees":true,"from045-0to067-5degrees":true,"from157-5to180-0degrees":true,"from000-0to022-5degrees":true,"from135-0to157-5degrees":true,"from225-0to247-5degrees":true,"from337-5to360-0degrees":true,"from090-0to112-5degrees":true},"mutcdCode":"warning","position":{"lat":389873128,"long":-1047677947}}},"notUsed3":0,"notUsed1":0,"priority":5,"content":{"workZone":[{"item":{"itis":1025}}]},"url":"null","notUsed":0,"frameType":"advisory","startTime":97477}],"msgCnt":1},"dataType":"us.dot.its.jpo.ode.plugin.j2735.travelerinformation.TravelerInformation"}}
\ No newline at end of file