Skip to content

Commit

Permalink
Merge pull request #116 from usdot-jpo-ode/compression
Browse files Browse the repository at this point in the history
Compression
  • Loading branch information
John-Wiens authored Oct 30, 2024
2 parents 4a9af7d + 9ebbb39 commit c699c36
Show file tree
Hide file tree
Showing 26 changed files with 25 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,6 @@ public APIServiceController(
try {

logger.info("Starting {}", this.getClass().getSimpleName());

System.out.println("Controller is Null" + stompController != null);

SpatSocketForwardTopology spatSocketForwardTopology = new SpatSocketForwardTopology(
"topic.ProcessedSpat",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public class ConflictMonitorApiProperties {
private String securitySvcsSignatureEndpoint = "sign";



private int lingerMs = 0;


@Autowired
Expand Down Expand Up @@ -185,6 +185,15 @@ public void setKafkaBrokers(String kafkaBrokers) {
this.kafkaBrokers = kafkaBrokers;
}

@Value("${kafka.linger_ms}")
public void setKafkaLingerMs(int lingerMs) {
this.lingerMs = lingerMs;
}

public int getKafkaLingerMs() {
return lingerMs;
}

public String getKafkaProducerType() {
return kafkaProducerType;
}
Expand Down Expand Up @@ -257,6 +266,8 @@ public void setKafkaTopicsDisabledSet(Set<String> kafkaTopicsDisabledSet) {
this.kafkaTopicsDisabledSet = kafkaTopicsDisabledSet;
}




@Bean
public ObjectMapper defaultMapper() {
Expand Down Expand Up @@ -380,7 +391,11 @@ public Properties createStreamProperties(String name) {
streamProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, FIVE_MINUTES_MS);

// Disable batching
streamProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
// streamProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);

streamProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
streamProps.put(ProducerConfig.LINGER_MS_CONFIG, getKafkaLingerMs());


if (confluentCloudEnabled) {
streamProps.put("ssl.endpoint.identification.algorithm", "https");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ public void addSpatBroadcastRateDistribution(List<IDCount> data, Long startTime,
for(IDCount elem : data){
int index = Integer.parseInt(elem.getId()) / 10;
output.get(index).setCount(elem.getCount());
System.out.println(elem);
}

// Convert to Chart Data and generate graph
Expand Down Expand Up @@ -348,7 +347,6 @@ public void addMapBroadcastRateDistribution(List<IDCount> data, Long startTime,
for(IDCount elem : data){
int index = Integer.parseInt(elem.getId());
output.get(index).setCount(elem.getCount());
System.out.println(elem);
}

// Convert to Chart Data and generate graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public Query getQuery(Integer intersectionID, Integer roadRegulatorID, String no
}

if(key != null){
System.out.println(key);
query.addCriteria(Criteria.where("key").is(key));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,14 @@ public static EncodedMessage identifyAsn1(String hexPacket){
public static String decodeXmlWithAcm(String xmlMessage) throws Exception {


System.out.println("Decoding Message: " + xmlMessage);
log.info("Decoding Message: " + xmlMessage);
log.info("Decoding message: {}", xmlMessage);

// Save XML to temp file
String tempDir = FileUtils.getTempDirectoryPath();
String tempFileName = "asn1-codec-java-" + UUID.randomUUID().toString() + ".xml";
log.info("Temp file name: {}", tempFileName);
System.out.println("Temp File Name: " + tempFileName);
log.info("Temp File Name: " + tempFileName);
Path tempFilePath = Path.of(tempDir, tempFileName);
File tempFile = new File(tempFilePath.toString());
FileUtils.writeStringToFile(tempFile, xmlMessage, StandardCharsets.UTF_8);
Expand All @@ -239,7 +239,7 @@ public static String decodeXmlWithAcm(String xmlMessage) throws Exception {
Process process = pb.start();
String result = IOUtils.toString(process.getInputStream(), StandardCharsets.UTF_8);
log.info("Result: {}", result);
System.out.println("Decode Result: " + result);
log.info("Decode Result: " + result);

// Clean up temp file
tempFile.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public ZonedDateTime convert(@NonNull String source) {
// Block of code to handle errors
}
}
System.out.println("Unable to Parse the following source time: "+ source);
return ZonedDateTime.of(0, 0, 0, 0, 0, 0, 0, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class KeycloakNoSecurityConfig {

@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws Exception {

System.out.println("Running without KeyCloak Authentication");
return httpSecurity
.sessionManagement(sm -> sm.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
.cors(corsConfigurer -> CorsUtil.configureCors(corsConfigurer, properties))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public class KeycloakSecurityConfig {
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity httpSecurity) throws Exception {

System.out.println("Running with KeyCloak Authentication");

return httpSecurity
.sessionManagement(sm -> sm.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
.cors(corsConfigurer -> CorsUtil.configureCors(corsConfigurer, properties))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class MethodSecurityConfig {
public MethodSecurityConfig(PermissionEvaluator permissionEvaluator, ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
this.permissionEvaluator = permissionEvaluator;
System.out.println("Method-level security annotations are enabled");
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ public static AuthorizationDecision checkAccess(Supplier<Authentication> authent

var auth = authentication.get();

System.out.printf("Check access for username=%s path=%s%n", auth.getName(), requestContext.getRequest().getRequestURI());
System.out.printf("Authorities: %s%n", auth.getAuthorities());
System.out.printf("Is authenticated: %s%n", auth.isAuthenticated());
System.out.printf("Details: %s%n", auth.getDetails());

return auth.isAuthenticated() ? GRANTED : DENIED;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ class DefaultPermissionEvaluator implements PermissionEvaluator {

@Override
public boolean hasPermission(Authentication auth, Object targetDomainObject, Object permission) {
System.out.printf("check permission user=%s target=%s permission=%s%n", auth.getName(), targetDomainObject, permission);

return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public JwtAuthenticationToken convert(Jwt jwt) {
String username = getUsernameFrom(jwt);

var token = new JwtAuthenticationToken(jwt, authorities, username);
System.out.printf("KeycloakJwtAuthenticationConverter: Converted token: %s%n Authorities: %s%n Username: %s%n", token, authorities, username);
return token;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static EmailSettings fromAttributes(Map<String, List<String>> attributes)
}


System.out.println("No Settings found Returning Default");
logger.info("No Settings found Returning Default");
return new EmailSettings();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public String toString() {
try {
testReturn = (mapper.writeValueAsString(this));
} catch (JsonProcessingException e) {
System.out.println(e);
}
return testReturn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ public void sendEmailViaSendGrid(String to, String subject, String text) {
request.setEndpoint("mail/send");
request.setBody(mail.build());
Response response = this.sendGrid.api(request);
System.out.println("Message Sent via Sendgrid to:" + to);
} catch (IOException ex) {
System.out.println(ex);
}
}

Expand All @@ -87,9 +85,7 @@ public void sendEmailViaPostmark(String to, String subject, String text){
htmlText);
try {
postmark.deliverMessage(message);
System.out.println("Message Sent via Postmark to:" + to);
} catch (PostmarkException | IOException e) {
System.out.println("Unable to send message" + e);
}
}

Expand All @@ -99,13 +95,11 @@ public void sendEmailViaSpringMail(String to, String subject, String text){
message.setSubject(subject);
message.setText(text);
mailSender.send(message);
System.out.println("Message Sent Via SMTP to: " + to);
}



public void sendSimpleMessage(String to, String subject, String text) {
System.out.println("Sending Simple Message");
if(props.getEmailBroker().equals("sendgrid")){
sendEmailViaSendGrid(to, subject, text);
}else if (props.getEmailBroker().equals("postmark")){
Expand Down Expand Up @@ -217,8 +211,6 @@ public List<UserRepresentation> getEmailList(List<String> notificationTypes, Lis

}

System.out.println("Returning" + emailList.size() + "Users");

return emailList;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ public static List<OdeBsmData> getJsonBsms(){
OdeBsmData bsm = objectMapper.readValue(bsmString, OdeBsmData.class);
bsms.add(bsm);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked BSM.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked BSM.");
e.printStackTrace();
}
return bsms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ public static List<ProcessedMap<LineString>> getProcessedMaps(){
ProcessedMap<LineString> map = objectMapper.readValue(processedMapString, typeReference);
maps.add(map);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked map.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked map.");
e.printStackTrace();
}
return maps;
Expand All @@ -51,10 +49,8 @@ public static List<OdeMapData> getJsonMaps(){
OdeMapData map = objectMapper.readValue(mapString, OdeMapData.class);
maps.add(map);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked map.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked map.");
e.printStackTrace();
}
return maps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ public static List<ProcessedSpat> getProcessedSpats(){
ProcessedSpat spat = objectMapper.readValue(processedSpatString, ProcessedSpat.class);
spats.add(spat);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked spat.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked spat.");
e.printStackTrace();
}
return spats;
Expand All @@ -43,10 +41,8 @@ public static List<OdeSpatData> getJsonSpats(){
OdeSpatData spat = objectMapper.readValue(spatString, OdeSpatData.class);
spats.add(spat);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked spat.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked spat.");
e.printStackTrace();
}
return spats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ public static List<OdeSrmData> getJsonSrms(){
OdeSrmData srm = objectMapper.readValue(srmString, OdeSrmData.class);
srms.add(srm);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked srm.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked srm.");
e.printStackTrace();
}
return srms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ public static List<OdeSsmData> getJsonSsms(){
OdeSsmData ssm = objectMapper.readValue(ssmString, OdeSsmData.class);
ssms.add(ssm);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked ssm.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked ssm.");
e.printStackTrace();
}
return ssms;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@ public static List<ObjectNode> getJsonTims(){
ObjectNode tim = objectMapper.readValue(timString, ObjectNode.class);
tims.add(tim);
} catch (JsonMappingException e) {
System.out.println("A Json Mapping Exception Occurred while trying to get data from mocked tim.");
e.printStackTrace();
} catch (JsonProcessingException e) {
System.out.println("A Json Processing Exception Occurred while trying to get data from a mocked tim.");
e.printStackTrace();
}
return tims;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ schema.bsm: classpath:schemas/bsm.schema.json
schema.map: classpath:schemas/map.schema.json
schema.spat: classpath:schemas/spat.schema.json


# Amount of time to wait to try and increase batching
kafka.linger_ms: 50

kafka.topics:
autoCreateTopics: false
numPartitions: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@

// // List<DefaultConfig> responseMap = responseEntity.getBody();

// // System.out.println(responseMap.size());
// // System.out.println(responseMap.get(0));

// // assertEquals(HttpStatus.OK, responseEntity.getStatusCode());
// // assertEquals(MediaType.APPLICATION_JSON, responseEntity.getHeaders().getContentType());
// // assertEquals(test, responseMap.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void testSpatGetAsOdeData() {
@Test
public void testSpatGetAsOdeJson() throws XmlUtilsException{
OdeSpatData spat = spatDecoder.getAsOdeJson(odeSpatDecodedXmlReference);
System.out.println("testSpatGetAsOdeJson" + spat);
assertEquals(spat.toJson(), odeSpatDecodedDataReference);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ public void testSsmGetAsOdeData() {
OdeData data = ssmDecoder.getAsOdeData(ssm.getAsn1Text());

OdeMsgMetadata metadata = data.getMetadata();

System.out.println(data);
// Copy over fields that might be different
metadata.setOdeReceivedAt("2024-05-14T23:01:21.516531700Z");
metadata.setSerialId(metadata.getSerialId().setStreamId("fc430f29-b761-4a2c-90fb-dc4c9f5d4e9c"));
Expand All @@ -52,7 +50,6 @@ public void testSsmGetAsOdeData() {
public void testSsmGetAsOdeJson() throws XmlUtilsException{
OdeSsmData ssm = ssmDecoder.getAsOdeJson(odeSsmDecodedXmlReference);
assertEquals(ssm.toJson(), odeSsmDecodedDataReference);
System.out.println(ssm);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public void testTimGetAsOdeData() {

OdeMsgMetadata metadata = data.getMetadata();

System.out.println(data);
// Copy over fields that might be different
metadata.setOdeReceivedAt("2024-05-14T23:01:21.516531700Z");
metadata.setSerialId(metadata.getSerialId().setStreamId("fc430f29-b761-4a2c-90fb-dc4c9f5d4e9c"));
Expand Down

0 comments on commit c699c36

Please sign in to comment.