Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory Optimizations #131

Merged
merged 12 commits into from
Jan 7, 2025
51 changes: 43 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,49 @@ COPY --from=builder /home/jpo-conflictmonitor/src/main/resources/application.yam
COPY --from=builder /home/jpo-conflictmonitor/src/main/resources/logback.xml /home
COPY --from=builder /home/jpo-conflictmonitor/target/jpo-conflictmonitor.jar /home



# Use jemalloc for RocksDB per Confluent recommendation:
# https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb
RUN amazon-linux-extras install -y epel && \
yum install -y jemalloc-devel
ENV LD_PRELOAD="/usr/lib64/libjemalloc.so"

# Entrypoint for prod: JMX not exposed.
# GC settings similar to Kafka recommendations, see: https://kafka.apache.org/documentation.html#java
# Set max Java heap usage as percentage of total available memory.
ENTRYPOINT ["java", \
"-Djava.rmi.server.hostname=$DOCKER_HOST_IP", \
"-Dcom.sun.management.jmxremote.port=9090", \
"-Dcom.sun.management.jmxremote.rmi.port=9090", \
"-Dcom.sun.management.jmxremote", \
"-Dcom.sun.management.jmxremote.local.only=true", \
"-Dcom.sun.management.jmxremote.authenticate=false", \
"-Dcom.sun.management.jmxremote.ssl=false", \
"-Dlogback.configurationFile=/home/logback.xml", \
"-XX:+UseG1GC", \
"-XX:MaxGCPauseMillis=20", \
"-XX:InitiatingHeapOccupancyPercent=35", \
"-XX:MetaspaceSize=96m", \
"-XX:MinMetaspaceFreeRatio=50", \
"-XX:MaxMetaspaceFreeRatio=80", \
"-XX:+ExplicitGCInvokesConcurrent", \
"-XX:InitialRAMPercentage=5.0", \
"-XX:MaxRAMPercentage=50.0", \
"-jar", \
"/home/jpo-conflictmonitor.jar"]
"/home/jpo-conflictmonitor.jar"]

# Entrypoint for testing: enables nonlocal JMX on port 10090
#ENTRYPOINT ["java", \
# "-Dcom.sun.management.jmxremote=true", \
# "-Dcom.sun.management.jmxremote.local.only=false", \
# "-Dcom.sun.management.jmxremote.authenticate=false", \
# "-Dcom.sun.management.jmxremote.ssl=false", \
# "-Dcom.sun.management.jmxremote.port=10090", \
# "-Dcom.sun.management.jmxremote.rmi.port=10090", \
# "-Djava.rmi.server.hostname=localhost", \
# "-Dlogback.configurationFile=/home/logback.xml", \
# "-XX:+UseG1GC", \
# "-XX:MaxGCPauseMillis=20", \
# "-XX:InitiatingHeapOccupancyPercent=35", \
# "-XX:MetaspaceSize=96m", \
# "-XX:MinMetaspaceFreeRatio=50", \
# "-XX:MaxMetaspaceFreeRatio=80", \
# "-XX:+ExplicitGCInvokesConcurrent", \
# "-XX:InitialRAMPercentage=5.0", \
# "-XX:MaxRAMPercentage=50.0", \
# "-jar", \
# "/home/jpo-conflictmonitor.jar"]
86 changes: 50 additions & 36 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ services:
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN:?error}
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG:?error}
image: jpo-conflictmonitor:latest
privileged: true # Set true to allow writing to /proc/sys/vm/drop_caches
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the rocksdb variables be passed into the conflict monitor in the docker compose?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they should, good catch

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conflict visualizer has another docker-compose file that brings up the conflict monitor. Do we need to add this parameter to that file as well? This change would not be part of this PR, but would involve a separate PR to the conflict visualizer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "privileged" setting was mainly for testing. I changed it to false by default. It doesn't need to be included anywhere else (and shouldn't be set in production). But the new environment variables below, starting with "ROCKSDB_" should be included anywhere there is a docker compose for cimms.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. Thanks for the clarification. I will open a PR in the Conflict Visualizer to match.

restart: ${RESTART_POLICY}
ports:
- "8082:8082"
- "10090:10090" # JMX
environment:
DOCKER_HOST_IP: ${DOCKER_HOST_IP:?error}
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error}
CONNECT_URL: ${CONNECT_URL:?error}
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error}
ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error}
ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error}
ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error}
ROCKSDB_BLOCK_SIZE: ${ROCKSDB_BLOCK_SIZE:?error}
ROCKSDB_N_MEMTABLES: ${ROCKSDB_N_MEMTABLES:?error}
ROCKSDB_MEMTABLE_SIZE: ${ROCKSDB_MEMTABLE_SIZE:?error}
healthcheck:
test: ["CMD", "java", "-version"]
interval: 10s
Expand All @@ -35,7 +43,7 @@ services:
deploy:
resources:
limits:
memory: 3G
memory: 1G
depends_on:
kafka:
condition: service_healthy
Expand All @@ -53,6 +61,12 @@ services:
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error}
CONNECT_URL: ${CONNECT_URL:?error}
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error}
ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error}
ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error}
ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error}
ROCKSDB_BLOCK_SIZE: ${ROCKSDB_BLOCK_SIZE:?error}
ROCKSDB_N_MEMTABLES: ${ROCKSDB_N_MEMTABLES:?error}
ROCKSDB_MEMTABLE_SIZE: ${ROCKSDB_MEMTABLE_SIZE:?error}
healthcheck:
test: ["CMD", "java", "-version"]
interval: 10s
Expand All @@ -71,38 +85,38 @@ services:
condition: service_healthy
required: false

deduplicator:
profiles:
- all
- cm_full
- cm_base
- deduplicator
build:
context: .
dockerfile: Dedup_Dockerfile
args:
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN:?error}
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG:?error}
image: jpo-deduplicator:latest
restart: ${RESTART_POLICY}
environment:
DOCKER_HOST_IP: ${DOCKER_HOST_IP}
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error}
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error}
healthcheck:
test: ["CMD", "java", "-version"]
interval: 10s
timeout: 10s
retries: 20
logging:
options:
max-size: "10m"
max-file: "5"
deploy:
resources:
limits:
memory: 3G
depends_on:
kafka:
condition: service_healthy
required: false
# deduplicator:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to remove the deduplicator as part of this PR? Or should we remove it as a part of a separate PR? In either case, we should probably delete it entirely (including removing the source code) instead of just commenting it out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had commented it out because, although it's not related to this PR, it was causing a conflict after pulling in the latest jpo-utils/develop branch. Now deleted entirely.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect the problem is that the new jpo-utils project contains its own version of the jpo-deduplicator. This would cause two versions of the deduplicator to start up and would break things since they will both try to allocate the same ports. So this fix seems reasonable to me.

# profiles:
# - all
# - cm_full
# - cm_base
# - deduplicator
# build:
# context: .
# dockerfile: Dedup_Dockerfile
# args:
# MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN:?error}
# MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG:?error}
# image: jpo-deduplicator:latest
# restart: ${RESTART_POLICY}
# environment:
# DOCKER_HOST_IP: ${DOCKER_HOST_IP}
# KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error}
# spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error}
# healthcheck:
# test: ["CMD", "java", "-version"]
# interval: 10s
# timeout: 10s
# retries: 20
# logging:
# options:
# max-size: "10m"
# max-file: "5"
# deploy:
# resources:
# limits:
# memory: 3G
# depends_on:
# kafka:
# condition: service_healthy
# required: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package us.dot.its.jpo.conflictmonitor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.CompressionType;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.Map;


/**
* Bounded memory configuration for RocksDB for all topologies.
* <p>References:
* <ul>
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb">Confluent: RocksDB Memory Management</a></li>
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#rocksdb-config-setter">Confluent: RocksDB Config Setter</a></li>
* </ul></p>
* <p>Configured using environment variables:
* <dl>
* <dt>ROCKSDB_TOTAL_OFF_HEAP_MEMORY</dt><dd>Total block cache size</dd>
* <dt>ROCKSDB_INDEX_FILTER_BLOCK_RATIO</dt><dd>Fraction of the block cache to use for high priority blocks (index
* and filter blocks).</dd>
* <dt>ROCKSDB_TOTAL_MEMTABLE_MEMORY</dt><dd>Write buffer size, include in block cache</dd>
* <dt>ROCKSDB_BLOCK_SIZE</dt><dd>{@link org.rocksdb.BlockBasedTableConfig#blockSize()}, Default 4KB</dd>
* <dt>ROCKSDB_N_MEMTABLES</dt><dd>{@link org.rocksdb.Options#maxWriteBufferNumber()}, Default 2</dd>
* <dt>ROCKSDB_MEMTABLE_SIZE</dt><dd>{@link org.rocksdb.Options#writeBufferSize()}, Default 64MB</dd>
* </dl>
* </p>
*/
@Slf4j
public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter {

private final static long TOTAL_OFF_HEAP_MEMORY;
private final static double INDEX_FILTER_BLOCK_RATIO;
private final static long TOTAL_MEMTABLE_MEMORY;

// Block size: Default 4K
private final static long BLOCK_SIZE;

// MaxWriteBufferNumber: Default 2
private final static int N_MEMTABLES;

// WriteBufferSize: Default 64MB
private final static long MEMTABLE_SIZE;

private final static long KB = 1024L;
private final static long MB = KB * KB;

static {
RocksDB.loadLibrary();

// Initialize properties from env variables
TOTAL_OFF_HEAP_MEMORY = getEnvLong("ROCKSDB_TOTAL_OFF_HEAP_MEMORY", 128 * MB);
INDEX_FILTER_BLOCK_RATIO = getEnvDouble("ROCKSDB_INDEX_FILTER_BLOCK_RATIO", 0.1);
TOTAL_MEMTABLE_MEMORY = getEnvLong("ROCKSDB_TOTAL_MEMTABLE_MEMORY", 64 * MB);
BLOCK_SIZE = getEnvLong("ROCKSDB_BLOCK_SIZE", 4 * KB);
N_MEMTABLES = getEnvInt("ROCKSDB_N_MEMTABLES", 2);
MEMTABLE_SIZE = getEnvLong("ROCKSDB_MEMTABLE_SIZE", 16 * MB);

log.info("Initialized BoundedMemoryRocksDBConfig. TOTAL_OFF_HEAP_MEMORY = {}, INDEX_FILTER_BLOCK_RATIO = {}," +
" TOTAL_MEMTABLE_MEMORY = {}, BLOCK_SIZE = {}, N_MEMTABLES = {}, MEMTABLE_SIZE = {}",
TOTAL_OFF_HEAP_MEMORY, INDEX_FILTER_BLOCK_RATIO, TOTAL_MEMTABLE_MEMORY, BLOCK_SIZE, N_MEMTABLES,
MEMTABLE_SIZE);
}


// See #1 below
private static final org.rocksdb.Cache cache
= new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO);
private static final org.rocksdb.WriteBufferManager writeBufferManager
= new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache);

@Override
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) {

log.info("Setting RocksDB config for store {}", storeName);

BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig();

// These three options in combination will limit the memory used by RocksDB to the size passed to the block
// cache (TOTAL_OFF_HEAP_MEMORY)
tableConfig.setBlockCache(cache);
tableConfig.setCacheIndexAndFilterBlocks(true);
options.setWriteBufferManager(writeBufferManager);

// These options are recommended to be set when bounding the total memory
// See #2 below
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
tableConfig.setPinTopLevelIndexAndFilter(true);
// See #3 below
tableConfig.setBlockSize(BLOCK_SIZE);
options.setMaxWriteBufferNumber(N_MEMTABLES);
options.setWriteBufferSize(MEMTABLE_SIZE);
// Enable compression (optional). Compression can decrease the required storage
// and increase the CPU usage of the machine. For CompressionType values, see
// https://javadoc.io/static/org.rocksdb/rocksdbjni/6.4.6/org/rocksdb/CompressionType.html.
options.setCompressionType(CompressionType.LZ4_COMPRESSION);

options.setTableFormatConfig(tableConfig);

}

@Override
public void close(final String storeName, final Options options) {
// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance.
}

private static long getEnvLong(String name, long defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;
try {
return Long.parseLong(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static int getEnvInt(String name, int defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;
try {
return Integer.parseInt(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static double getEnvDouble(String name, double defaultValue) {
String strValue = getEnvString(name);
if (strValue == null) return defaultValue;
try {
return Double.parseDouble(strValue);
} catch (NumberFormatException nfe) {
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe);
return defaultValue;
}
}

private static String getEnvString(String name) {
String strValue = System.getenv(name);
if (strValue == null) {
log.warn("Env variable {} is not set", name);
}
return strValue;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,9 @@ public Properties createStreamProperties(String name) {
streamProps.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");

// Reduce cache buffering per topology to 1MB
streamProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1 * 1024 * 1024L);
streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 1 * 1024 * 1024L);
// Optionally, to disable caching:
//streamProps.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);

// Decrease default commit interval. Default for 'at least once' mode of 30000ms
// is too slow.
Expand Down Expand Up @@ -899,6 +901,8 @@ public Properties createStreamProperties(String name) {
}
}

// Configure RocksDB memory usage
streamProps.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, BoundedMemoryRocksDBConfig.class);

return streamProps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ private BsmMessageCountProgressionEvent createEvent(ProcessedBsm<Point> previous
event.setTimestampB(thisState.getTimeStamp().format(formatter));
event.setVehicleId(thisState.getFeatures()[featureIndex].getId().toString());


return event;
}

Expand Down
2 changes: 1 addition & 1 deletion jpo-utils
Submodule jpo-utils updated 71 files
+26 −0 .github/workflows/docker.yml
+41 −0 .github/workflows/dockerhub.yml
+3 −1 .gitignore
+72 −2 README.md
+2 −2 docker-compose-connect.yml
+44 −0 docker-compose-deduplicator.yml
+2 −2 docker-compose-kafka.yml
+2 −1 docker-compose.yml
+0 −0 jikkou/Dockerfile.jikkou
+0 −0 jikkou/application.conf
+0 −0 jikkou/jikkouconfig
+0 −0 jikkou/kafka-connectors-template.jinja
+100 −31 jikkou/kafka-connectors-values.yaml
+0 −0 jikkou/kafka-topics-template.jinja
+16 −0 jikkou/kafka-topics-values.yaml
+0 −0 jikkou/kafka_connector_init.sh
+0 −0 jikkou/kafka_init.sh
+1 −0 jpo-deduplicator/.dockerignore
+57 −0 jpo-deduplicator/Dockerfile
+228 −0 jpo-deduplicator/jpo-deduplicator/LICENSE-2.0.html
+294 −0 jpo-deduplicator/jpo-deduplicator/pom.xml
+28 −0 jpo-deduplicator/jpo-deduplicator/settings.xml
+57 −0 jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorApplication.java
+444 −0 jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/DeduplicatorProperties.java
+197 −0 jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/KafkaConfiguration.java
+56 −0 jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/StreamsExceptionHandler.java
+107 −0 .../jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/DeduplicatorServiceController.java
+21 −0 jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/models/JsonPair.java
+20 −0 ...deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/models/OdeBsmPair.java
+20 −0 ...deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/models/OdeMapPair.java
+19 −0 jpo-deduplicator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/models/Pair.java
+21 −0 ...icator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/models/ProcessedMapPair.java
+20 −0 ...tor/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/models/ProcessedMapWktPair.java
+69 −0 ...-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/DeduplicationProcessor.java
+80 −0 ...jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/OdeBsmJsonProcessor.java
+70 −0 ...jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/OdeMapJsonProcessor.java
+43 −0 ...icator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/OdeRawEncodedTimJsonProcessor.java
+42 −0 ...jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/OdeTimJsonProcessor.java
+57 −0 ...o-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/ProcessedMapProcessor.java
+55 −0 ...eduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/ProcessedMapWktProcessor.java
+64 −0 ...-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/ProcessedSpatProcessor.java
+26 −0 ...rc/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/suppliers/OdeBsmJsonProcessorSupplier.java
+22 −0 ...rc/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/suppliers/OdeMapJsonProcessorSupplier.java
+22 −0 ...n/java/us/dot/its/jpo/deduplicator/deduplicator/processors/suppliers/OdeRawEncodedTimProcessorSupplier.java
+22 −0 ...rc/main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/suppliers/OdeTimJsonProcessorSupplier.java
+23 −0 .../main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/suppliers/ProcessedMapProcessorSupplier.java
+22 −0 ...in/java/us/dot/its/jpo/deduplicator/deduplicator/processors/suppliers/ProcessedMapWktProcessorSupplier.java
+22 −0 ...main/java/us/dot/its/jpo/deduplicator/deduplicator/processors/suppliers/ProcessedSpatProcessorSupplier.java
+24 −0 ...cator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/serialization/JsonSerdes.java
+52 −0 ...cator/jpo-deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/serialization/PairSerdes.java
+125 −0 ...deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/BsmDeduplicatorTopology.java
+121 −0 ...deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/MapDeduplicatorTopology.java
+131 −0 ...src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/OdeRawEncodedTimDeduplicatorTopology.java
+92 −0 ...tor/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/ProcessedMapDeduplicatorTopology.java
+93 −0 .../src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/ProcessedMapWktDeduplicatorTopology.java
+97 −0 ...or/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/ProcessedSpatDeduplicatorTopology.java
+126 −0 ...deduplicator/src/main/java/us/dot/its/jpo/deduplicator/deduplicator/topologies/TimDeduplicatorTopology.java
+96 −0 jpo-deduplicator/jpo-deduplicator/src/main/resources/application.yaml
+20 −0 jpo-deduplicator/jpo-deduplicator/src/main/resources/logback.xml
+116 −0 jpo-deduplicator/jpo-deduplicator/src/test/java/deduplicator/BsmDeduplicatorTopologyTest.java
+96 −0 jpo-deduplicator/jpo-deduplicator/src/test/java/deduplicator/MapDeduplicatorTopologyTest.java
+89 −0 jpo-deduplicator/jpo-deduplicator/src/test/java/deduplicator/OdeRawEncodedTimDeduplicatorTopologyTest.java
+99 −0 jpo-deduplicator/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapDeduplicatorTopologyTest.java
+101 −0 jpo-deduplicator/jpo-deduplicator/src/test/java/deduplicator/ProcessedMapWktDeduplicatorTopologyTest.java
+106 −0 jpo-deduplicator/jpo-deduplicator/src/test/java/deduplicator/ProcessedSpatDeduplicatorTopologyTest.java
+89 −0 jpo-deduplicator/jpo-deduplicator/src/test/java/deduplicator/TimDeduplicatorTopologyTest.java
+1 −0 jpo-deduplicator/jpo-deduplicator/src/test/resources/application-testConfig.yaml
+11 −0 jpo-deduplicator/jpo-deduplicator/src/test/resources/logback-test.xml
+26 −2 mongo/create_indexes.js
+0 −0 mongo/setup_mongo.sh
+31 −3 sample.env
13 changes: 12 additions & 1 deletion sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,15 @@ MONGO_EXPORTER_PASSWORD=export

# Generate a random string for the MongoDB keyfile using the following command:
# $ openssl rand -base64 32
MONGO_DB_KEYFILE_STRING="xgxPRfDpAoLv4rOwA0sVkQC0nEBfRYsnEgdH3knku+4="
MONGO_DB_KEYFILE_STRING="xgxPRfDpAoLv4rOwA0sVkQC0nEBfRYsnEgdH3knku+4="

# RocksDB Bounded Memory Config Properties
# 128 MB = 134217728
# 64 MB = 67108864
# 16 MB = 16777216
ROCKSDB_TOTAL_OFF_HEAP_MEMORY=134217728
ROCKSDB_INDEX_FILTER_BLOCK_RATIO=0.1
ROCKSDB_TOTAL_MEMTABLE_MEMORY=67108864
ROCKSDB_BLOCK_SIZE=4096
ROCKSDB_N_MEMTABLES=2
ROCKSDB_MEMTABLE_SIZE=16777216
Loading