-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Memory Optimizations #131
Memory Optimizations #131
Changes from 10 commits
c687be6
8a7579e
2d25c95
a5fb9a9
39603ea
41b96f5
21efd53
a81830a
e39590c
f263a27
022103d
e9af9b8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,14 +15,22 @@ services: | |
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN:?error} | ||
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG:?error} | ||
image: jpo-conflictmonitor:latest | ||
privileged: true # Set true to allow writing to /proc/sys/vm/drop_caches | ||
restart: ${RESTART_POLICY} | ||
ports: | ||
- "8082:8082" | ||
- "10090:10090" # JMX | ||
environment: | ||
DOCKER_HOST_IP: ${DOCKER_HOST_IP:?error} | ||
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error} | ||
CONNECT_URL: ${CONNECT_URL:?error} | ||
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error} | ||
ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error} | ||
ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error} | ||
ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error} | ||
ROCKSDB_BLOCK_SIZE: ${ROCKSDB_BLOCK_SIZE:?error} | ||
ROCKSDB_N_MEMTABLES: ${ROCKSDB_N_MEMTABLES:?error} | ||
ROCKSDB_MEMTABLE_SIZE: ${ROCKSDB_MEMTABLE_SIZE:?error} | ||
healthcheck: | ||
test: ["CMD", "java", "-version"] | ||
interval: 10s | ||
|
@@ -35,7 +43,7 @@ services: | |
deploy: | ||
resources: | ||
limits: | ||
memory: 3G | ||
memory: 1G | ||
depends_on: | ||
kafka: | ||
condition: service_healthy | ||
|
@@ -53,6 +61,12 @@ services: | |
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error} | ||
CONNECT_URL: ${CONNECT_URL:?error} | ||
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error} | ||
ROCKSDB_TOTAL_OFF_HEAP_MEMORY: ${ROCKSDB_TOTAL_OFF_HEAP_MEMORY:?error} | ||
ROCKSDB_INDEX_FILTER_BLOCK_RATIO: ${ROCKSDB_INDEX_FILTER_BLOCK_RATIO:?error} | ||
ROCKSDB_TOTAL_MEMTABLE_MEMORY: ${ROCKSDB_TOTAL_MEMTABLE_MEMORY:?error} | ||
ROCKSDB_BLOCK_SIZE: ${ROCKSDB_BLOCK_SIZE:?error} | ||
ROCKSDB_N_MEMTABLES: ${ROCKSDB_N_MEMTABLES:?error} | ||
ROCKSDB_MEMTABLE_SIZE: ${ROCKSDB_MEMTABLE_SIZE:?error} | ||
healthcheck: | ||
test: ["CMD", "java", "-version"] | ||
interval: 10s | ||
|
@@ -71,38 +85,38 @@ services: | |
condition: service_healthy | ||
required: false | ||
|
||
deduplicator: | ||
profiles: | ||
- all | ||
- cm_full | ||
- cm_base | ||
- deduplicator | ||
build: | ||
context: . | ||
dockerfile: Dedup_Dockerfile | ||
args: | ||
MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN:?error} | ||
MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG:?error} | ||
image: jpo-deduplicator:latest | ||
restart: ${RESTART_POLICY} | ||
environment: | ||
DOCKER_HOST_IP: ${DOCKER_HOST_IP} | ||
KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error} | ||
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error} | ||
healthcheck: | ||
test: ["CMD", "java", "-version"] | ||
interval: 10s | ||
timeout: 10s | ||
retries: 20 | ||
logging: | ||
options: | ||
max-size: "10m" | ||
max-file: "5" | ||
deploy: | ||
resources: | ||
limits: | ||
memory: 3G | ||
depends_on: | ||
kafka: | ||
condition: service_healthy | ||
required: false | ||
# deduplicator: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to remove the deduplicator as part of this PR? Or should we remove it as a part of a separate PR? In either case, we should probably delete it entirely (including removing the source code) instead of just commenting it out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had commented it out because, although it's not related to this PR, it was causing a conflict after pulling in the latest jpo-utils/develop branch. Now deleted entirely. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suspect the problem is that the new jpo-utils project contains its own version of the jpo-deduplicator. This would cause two versions of the deduplicator to start up and would break things since they will both try to allocate the same ports. So this fix seems reasonable to me. |
||
# profiles: | ||
# - all | ||
# - cm_full | ||
# - cm_base | ||
# - deduplicator | ||
# build: | ||
# context: . | ||
# dockerfile: Dedup_Dockerfile | ||
# args: | ||
# MAVEN_GITHUB_TOKEN: ${MAVEN_GITHUB_TOKEN:?error} | ||
# MAVEN_GITHUB_ORG: ${MAVEN_GITHUB_ORG:?error} | ||
# image: jpo-deduplicator:latest | ||
# restart: ${RESTART_POLICY} | ||
# environment: | ||
# DOCKER_HOST_IP: ${DOCKER_HOST_IP} | ||
# KAFKA_BOOTSTRAP_SERVERS: ${KAFKA_BOOTSTRAP_SERVERS:?error} | ||
# spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:?error} | ||
# healthcheck: | ||
# test: ["CMD", "java", "-version"] | ||
# interval: 10s | ||
# timeout: 10s | ||
# retries: 20 | ||
# logging: | ||
# options: | ||
# max-size: "10m" | ||
# max-file: "5" | ||
# deploy: | ||
# resources: | ||
# limits: | ||
# memory: 3G | ||
# depends_on: | ||
# kafka: | ||
# condition: service_healthy | ||
# required: false |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
package us.dot.its.jpo.conflictmonitor; | ||
|
||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.kafka.streams.state.RocksDBConfigSetter; | ||
import org.rocksdb.BlockBasedTableConfig; | ||
import org.rocksdb.CompressionType; | ||
import org.rocksdb.Options; | ||
import org.rocksdb.RocksDB; | ||
import org.springframework.context.annotation.Scope; | ||
import org.springframework.stereotype.Component; | ||
|
||
import java.util.Map; | ||
|
||
|
||
/** | ||
* Bounded memory configuration for RocksDB for all topologies. | ||
* <p>References: | ||
* <ul> | ||
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb">Confluent: RocksDB Memory Management</a></li> | ||
* <li><a href="https://docs.confluent.io/platform/current/streams/developer-guide/config-streams.html#rocksdb-config-setter">Confluent: RocksDB Config Setter</a></li> | ||
* </ul></p> | ||
* <p>Configured using environment variables: | ||
* <dl> | ||
* <dt>ROCKSDB_TOTAL_OFF_HEAP_MEMORY</dt><dd>Total block cache size</dd> | ||
* <dt>ROCKSDB_INDEX_FILTER_BLOCK_RATIO</dt><dd>Fraction of the block cache to use for high priority blocks (index | ||
* and filter blocks).</dd> | ||
* <dt>ROCKSDB_TOTAL_MEMTABLE_MEMORY</dt><dd>Write buffer size, include in block cache</dd> | ||
* <dt>ROCKSDB_BLOCK_SIZE</dt><dd>{@link org.rocksdb.BlockBasedTableConfig#blockSize()}, Default 4KB</dd> | ||
* <dt>ROCKSDB_N_MEMTABLES</dt><dd>{@link org.rocksdb.Options#maxWriteBufferNumber()}, Default 2</dd> | ||
* <dt>ROCKSDB_MEMTABLE_SIZE</dt><dd>{@link org.rocksdb.Options#writeBufferSize()}, Default 64MB</dd> | ||
* </dl> | ||
* </p> | ||
*/ | ||
@Slf4j | ||
public class BoundedMemoryRocksDBConfig implements RocksDBConfigSetter { | ||
|
||
private final static long TOTAL_OFF_HEAP_MEMORY; | ||
private final static double INDEX_FILTER_BLOCK_RATIO; | ||
private final static long TOTAL_MEMTABLE_MEMORY; | ||
|
||
// Block size: Default 4K | ||
private final static long BLOCK_SIZE; | ||
|
||
// MaxWriteBufferNumber: Default 2 | ||
private final static int N_MEMTABLES; | ||
|
||
// WriteBufferSize: Default 64MB | ||
private final static long MEMTABLE_SIZE; | ||
|
||
private final static long KB = 1024L; | ||
private final static long MB = KB * KB; | ||
|
||
static { | ||
RocksDB.loadLibrary(); | ||
|
||
// Initialize properties from env variables | ||
TOTAL_OFF_HEAP_MEMORY = getEnvLong("ROCKSDB_TOTAL_OFF_HEAP_MEMORY", 128 * MB); | ||
INDEX_FILTER_BLOCK_RATIO = getEnvDouble("ROCKSDB_INDEX_FILTER_BLOCK_RATIO", 0.1); | ||
TOTAL_MEMTABLE_MEMORY = getEnvLong("ROCKSDB_TOTAL_MEMTABLE_MEMORY", 64 * MB); | ||
BLOCK_SIZE = getEnvLong("ROCKSDB_BLOCK_SIZE", 4 * KB); | ||
N_MEMTABLES = getEnvInt("ROCKSDB_N_MEMTABLES", 2); | ||
MEMTABLE_SIZE = getEnvLong("ROCKSDB_MEMTABLE_SIZE", 16 * MB); | ||
|
||
log.info("Initialized BoundedMemoryRocksDBConfig. TOTAL_OFF_HEAP_MEMORY = {}, INDEX_FILTER_BLOCK_RATIO = {}," + | ||
" TOTAL_MEMTABLE_MEMORY = {}, BLOCK_SIZE = {}, N_MEMTABLES = {}, MEMTABLE_SIZE = {}", | ||
TOTAL_OFF_HEAP_MEMORY, INDEX_FILTER_BLOCK_RATIO, TOTAL_MEMTABLE_MEMORY, BLOCK_SIZE, N_MEMTABLES, | ||
MEMTABLE_SIZE); | ||
} | ||
|
||
|
||
// See #1 below | ||
private static final org.rocksdb.Cache cache | ||
= new org.rocksdb.LRUCache(TOTAL_OFF_HEAP_MEMORY, -1, false, INDEX_FILTER_BLOCK_RATIO); | ||
private static final org.rocksdb.WriteBufferManager writeBufferManager | ||
= new org.rocksdb.WriteBufferManager(TOTAL_MEMTABLE_MEMORY, cache); | ||
|
||
@Override | ||
public void setConfig(final String storeName, final Options options, final Map<String, Object> configs) { | ||
|
||
log.info("Setting RocksDB config for store {}", storeName); | ||
|
||
BlockBasedTableConfig tableConfig = (BlockBasedTableConfig) options.tableFormatConfig(); | ||
|
||
// These three options in combination will limit the memory used by RocksDB to the size passed to the block | ||
// cache (TOTAL_OFF_HEAP_MEMORY) | ||
tableConfig.setBlockCache(cache); | ||
tableConfig.setCacheIndexAndFilterBlocks(true); | ||
options.setWriteBufferManager(writeBufferManager); | ||
|
||
// These options are recommended to be set when bounding the total memory | ||
// See #2 below | ||
tableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true); | ||
tableConfig.setPinTopLevelIndexAndFilter(true); | ||
// See #3 below | ||
tableConfig.setBlockSize(BLOCK_SIZE); | ||
options.setMaxWriteBufferNumber(N_MEMTABLES); | ||
options.setWriteBufferSize(MEMTABLE_SIZE); | ||
// Enable compression (optional). Compression can decrease the required storage | ||
// and increase the CPU usage of the machine. For CompressionType values, see | ||
// https://javadoc.io/static/org.rocksdb/rocksdbjni/6.4.6/org/rocksdb/CompressionType.html. | ||
options.setCompressionType(CompressionType.LZ4_COMPRESSION); | ||
|
||
options.setTableFormatConfig(tableConfig); | ||
|
||
} | ||
|
||
@Override | ||
public void close(final String storeName, final Options options) { | ||
// Cache and WriteBufferManager should not be closed here, as the same objects are shared by every store instance. | ||
} | ||
|
||
private static long getEnvLong(String name, long defaultValue) { | ||
String strValue = getEnvString(name); | ||
if (strValue == null) return defaultValue; | ||
try { | ||
return Long.parseLong(strValue); | ||
} catch (NumberFormatException nfe) { | ||
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); | ||
return defaultValue; | ||
} | ||
} | ||
|
||
private static int getEnvInt(String name, int defaultValue) { | ||
String strValue = getEnvString(name); | ||
if (strValue == null) return defaultValue; | ||
try { | ||
return Integer.parseInt(strValue); | ||
} catch (NumberFormatException nfe) { | ||
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); | ||
return defaultValue; | ||
} | ||
} | ||
|
||
private static double getEnvDouble(String name, double defaultValue) { | ||
String strValue = getEnvString(name); | ||
if (strValue == null) return defaultValue; | ||
try { | ||
return Double.parseDouble(strValue); | ||
} catch (NumberFormatException nfe) { | ||
log.error("Error parsing env variable to long {}, {}", name, strValue, nfe); | ||
return defaultValue; | ||
} | ||
} | ||
|
||
private static String getEnvString(String name) { | ||
String strValue = System.getenv(name); | ||
if (strValue == null) { | ||
log.warn("Env variable {} is not set", name); | ||
} | ||
return strValue; | ||
} | ||
|
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the rocksdb variables be passed into the conflict monitor in the docker compose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes they should, good catch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conflict visualizer has another docker-compose file that brings up the conflict monitor. Do we need to add this parameter to that file as well? This change would not be part of this PR, but would involve a separate PR to the conflict visualizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "privileged" setting was mainly for testing. I changed it to false by default. It doesn't need to be included anywhere else (and shouldn't be set in production). But the new environment variables below, starting with "ROCKSDB_" should be included anywhere there is a docker compose for cimms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome. Thanks for the clarification. I will open a PR in the Conflict Visualizer to match.