US Department of Transportation (USDOT) Intelligent Transportation Systems (ITS) Joint Program Office (JPO) Utilities
The JPO ITS utilities repository serves as a central location for deploying open-source utilities used by other JPO-ITS repositories.
Table of Contents
- Minimum RAM: 16 GB
- Supported operating systems:
- Ubuntu 22.04 Linux (Recommended)
- Windows 10/11 Professional (Professional version required for Docker virtualization)
- OSX 10 Mojave
- NOTE: Not all images have ARM64 builds (they can still be ran through a compatibility layer)
- Docker-compose V2 - version 3.4 or newer
The jpo-utils repository is intended to be ran with docker-compose v2 as it uses functionality added in the v2 release.
Read the following guides to familiarize yourself with the jpo-utils Docker configuration.
Important!
You must rename sample.env
to .env
for Docker to automatically read the file. Do not push this file to source control.
A MongoDB instance that is initialized as a standalone replica-set and has configured users is configured in the docker-compose-mongo file. To use a different setup_mongo.sh
or create_indexes.js
script, pass in the relative path of the new script by overriding the KAFKA_INIT_SCRIPT_RELATIVE_PATH
or MONGO_CREATE_INDEXES_SCRIPT_RELATIVE_PATH
environmental variables. These scripts facilitate the initialization of the MongoDB Database along with the created indexes.
Where the COMPOSE_PROFILES
variable in you're .env
file are as follows:
mongo_full
- deploys all resources in the docker-compose-mongo.yml filemongo
- only deploys themongo
andmongo-setup
servicesmongo_express
- only deploys themongo-express
service
- Create a copy of
sample.env
and rename it to.env
. - Update the variable
DOCKER_HOST_IP
to the local IP address of the system running docker which can be found by running theifconfig
command- Hint: look for "inet addr:" within "eth0" or "en0" for OSX
- Set the password for
MONGO_ADMIN_DB_PASS
andMONGO_READ_WRITE_PASS
environmental variables to a secure password. - Set the
COMPOSE_PROFILES
variable to:mongo_full
- Run the following command:
docker-compose up -d
- Go to
localhost:8082
in your browser and verify thatmongo-express
can see the created database
The Bitnami Kafka is being used as a hybrid controller and broker in the docker-compose-kafka file. To use a different kafka_init.sh
script, pass in the relative path of the new script by overriding the KAFKA_INIT_SCRIPT_RELATIVE_PATH
environmental variable. This can help in initializing new topics at startup.
An optional kafka-init
, schema-registry
, and kafka-ui
instance can be deployed by configuring the COMPOSE_PROFILES
as follows:
kafka_full
- deploys all resources in the docker-compose-kafka.yml filekafka
- only deploys thekafka
serviceskafka_setup
- deploys akafka-setup
service that creates topics in thekafka
service.kafka_schema_registry
- deploys akafka-schema-registry
service that can be used to manage schemas for kafka topicskafka_ui
- deploys a web interface to interact with the kafka cluster
The Kafka topics created by the kafka-setup
service are configured in the kafka-topics-values.yaml file. The topics in that file are organized by the application, and sorted into "Stream Topics" (those with cleanup.policy
= delete
) and "Table Topics" (with cleanup.policy
= compact
).
The following enviroment variables can be used to configure Kafka Topic creation.
Environment Variable | Description |
---|---|
KAFKA_TOPIC_CREATE_ODE |
Whether to create topics for the ODE |
KAFKA_TOPIC_CREATE_GEOJSONCONVERTER |
Whether to create topics for the GeoJSON Converter |
KAFKA_TOPIC_CREATE_CONFLICTMONITOR |
Whether to create topics for the Conflict Monitor |
KAFKA_TOPIC_CREATE_DEDUPLICATOR |
Whether to create topics for the Deduplicator |
KAFKA_TOPIC_PARTITIONS |
Number of partitions |
KAFKA_TOPIC_REPLICAS |
Number of replicas |
KAFKA_TOPIC_MIN_INSYNC_REPLICAS |
Minumum number of in-sync replicas (for use with ack=all) |
KAFKA_TOPIC_RETENTION_MS |
Retention time for stream topics, milliseconds |
KAFKA_TOPIC_DELETE_RETENTION_MS |
Tombstone retention time for compacted topics, milliseconds |
KAFKA_TOPIC_CONFIG_RELATIVE_PATH |
Relative path to the Kafka topic yaml configuration script, upper level directories are supported |
- Create a copy of
sample.env
and rename it to.env
. - Update the variable
DOCKER_HOST_IP
to the local IP address of the system running docker which can be found by running theifconfig
command- Hint: look for "inet addr:" within "eth0" or "en0" for OSX
- Set the
COMPOSE_PROFILES
variable to:kafka_full
- Run the following command:
docker-compose up -d
- Go to
localhost:8001
in your browser and verify thatkafka-ui
can see the created kafka cluster and initialized topics
The mongo-connector service connects to specified Kafka topics and deposits these messages to separate collections in the MongoDB Database. The codebase that provides this functionality comes from Confluent using their community licensed cp-kafka-connect image. Documentation for this image can be found here.
Kafka connectors are managed by the
Set the COMPOSE_PROFILES
environmental variable as follows:
kafka_connect
will only spin up thekafka-connect
andkafka-init
services in docker-compose-connect- NOTE: This implies that you will be using a separate Kafka and MongoDB cluster
kafka_connect_standalone
will run the following:kafka-connect
service from docker-compose-connectkafka-init
service from docker-compose-connectkafka
service from docker-compose-kafkamongo
andmongo-setup
services from docker-compose-mongo
The Kafka connectors created by the kafka-connect-setup
service are configured in the kafka-connectors-values.yaml file. The connectors in that file are organized by the application, and given parameters to define the Kafka -> MongoDB sync connector:
Connector Variable | Required | Condition | Description |
---|---|---|---|
topicName |
Yes | Always | The name of the Kafka topic to sync from |
collectionName |
Yes | Always | The name of the MongoDB collection to write to |
generateTimestamp |
No | Optional | Enable or disable adding a timestamp to each message (true/false) |
connectorName |
No | Optional | Override the name of the connector from the collectionName to this field instead |
useTimestamp |
No | Optional | Converts the timestampField field at the top level of the value to a BSON date |
timestampField |
No | Required if useTimestamp is true |
The name of the timestamp field at the top level of the message |
useKey |
No | Optional | Override the document _id field in MongoDB to use a specified keyField from the message |
keyField |
No | Required if useKey is true |
The name of the key field |
The following environment variables can be used to configure Kafka Connectors:
Environment Variable | Description |
---|---|
CONNECT_URL |
Kafka connect API URL |
CONNECT_LOG_LEVEL |
Kafka connect log level (OFF , ERROR , WARN , INFO ) |
CONNECT_TASKS_MAX |
Number of concurrent tasks to configure on kafka connectors |
CONNECT_CREATE_ODE |
Whether to create kafka connectors for the ODE |
CONNECT_CREATE_GEOJSONCONVERTER |
Whether to create topics for the GeojsonConverter |
CONNECT_CREATE_CONFLICTMONITOR |
Whether to create kafka connectors for the Conflict Monitor |
CONNECT_CREATE_DEDUPLICATOR |
Whether to create topics for the Deduplicator |
CONNECT_CONFIG_RELATIVE_PATH |
Relative path to the Kafka connector yaml configuration script, upper level directories are supported |
- Create a copy of
sample.env
and rename it to.env
. - Update the variable
DOCKER_HOST_IP
to the local IP address of the system running docker - Set the password for
MONGO_ADMIN_DB_PASS
andMONGO_READ_WRITE_PASS
environmental variables to a secure password. - Set the
COMPOSE_PROFILES
variable to:kafka_connect_standalone,mongo_express,kafka_ui,kafka_setup
- Navigate back to the root directory and run the following command:
docker compose up -d
- Produce a sample message to one of the sink topics by using
kafka_ui
by:- Go to
localhost:8001
- Click local -> Topics
- Select
topic.OdeBsmJson
- Select
Produce Message
- Leave the defaults except set the
Value
field to{"foo":"bar"}
- Click
Produce Message
- Go to
- View the synced message in
mongo-express
by:- Go to
localhost:8082
- Click
ode
-- Or click whatever value you set theMONGO_DB_NAME
to - Click
OdeBsmJson
, and now you should see your message!
- Go to
- Feel free to test this with other topics or by producing to these topics using the ODE
The JPO-Deduplicator is a Kafka Java spring-boot application designed to reduce the number of messages stored and processed in the ODE system. This is done by reading in messages from an input topic (such as topic.ProcessedMap) and outputting a subset of those messages on a related output topic (topic.DeduplicatedProcessedMap). Functionally, this is done by removing deduplicate messages from the input topic and only passing on unique messages. In addition, each topic will pass on at least 1 message per hour even if the message is a duplicate. This behavior helps ensure messages are still flowing through the system. The following topics currently support deduplication.
- topic.ProcessedMap -> topic.DeduplicatedProcessedMap
- topic.ProcessedMapWKT -> topic.DeduplicatedProcessedMapWKT
- topic.OdeMapJson -> topic.DeduplicatedOdeMapJson
- topic.OdeTimJson -> topic.DeduplicatedOdeTimJson
- topic.OdeRawEncodedTIMJson -> topic.DeduplicatedOdeRawEncodedTIMJson
- topic.OdeBsmJson -> topic.DeduplicatedOdeBsmJson
- topic.ProcessedSpat -> topic.DeduplicatedProcessedSpat
When running the jpo-deduplication as a submodule in jpo-utils, the deduplicator will automatically turn on deduplication for a topic when that topic is created. For example if the KAFKA_TOPIC_CREATE_GEOJSONCONVERTER environment variable is set to true, the deduplicator will start performing deduplication for ProcessedMap, ProcessedMapWKT, and ProcessedSpat data.
To manually configure deduplication for a topic, the following environment variables can also be used.
Environment Variable | Description |
---|---|
ENABLE_PROCESSED_MAP_DEDUPLICATION |
true / false - Enable ProcessedMap message Deduplication |
ENABLE_PROCESSED_MAP_WKT_DEDUPLICATION |
true / false - Enable ProcessedMap WKT message Deduplication |
ENABLE_ODE_MAP_DEDUPLICATION |
true / false - Enable ODE MAP message Deduplication |
ENABLE_ODE_TIM_DEDUPLICATION |
true / false - Enable ODE TIM message Deduplication |
ENABLE_ODE_RAW_ENCODED_TIM_DEDUPLICATION |
true / false - Enable ODE Raw Encoded TIM Deduplication |
ENABLE_PROCESSED_SPAT_DEDUPLICATION |
true / false - Enable ProcessedSpat Deduplication |
ENABLE_ODE_BSM_DEDUPLICATION |
true / false - Enable ODE BSM Deduplication |
A GitHub token is required to pull artifacts from GitHub repositories. This is required to obtain the jpo-deduplicator jars and must be done before attempting to build this repository.
- Log into GitHub.
- Navigate to Settings -> Developer settings -> Personal access tokens.
- Click "New personal access token (classic)".
- As of now, GitHub does not support
Fine-grained tokens
for obtaining packages.
- As of now, GitHub does not support
- Provide a name and expiration for the token.
- Select the
read:packages
scope. - Click "Generate token" and copy the token.
- Copy the token name and token value into your
.env
file.
For local development the following steps are also required
8. Create a copy of settings.xml and save it to ~/.m2/settings.xml
9. Update the variables in your ~/.m2/settings.xml
with the token value and target jpo-ode organization.
- Create a copy of
sample.env
and rename it to.env
. - Update the variable
MAVEN_GITHUB_TOKEN
to a github token used for downloading jar file dependencies. For full instructions on how to generate a token please see here: - Set the password for
MONGO_ADMIN_DB_PASS
andMONGO_READ_WRITE_PASS
environmental variables to a secure password. - Set the
COMPOSE_PROFILES
variable to:kafka,kafka_ui,kafka_setup, jpo-deduplicator
- Navigate back to the root directory and run the following command:
docker compose up -d
- Produce a sample message to one of the sink topics by using
kafka_ui
by:- Go to
localhost:8001
- Click local -> Topics
- Select
topic.OdeMapJson
- Select
Produce Message
- Copy in sample JSON for a Map Message
- Click
Produce Message
multiple times
- Go to
- View the synced message in
kafka_ui
by:- Go to
localhost:8001
- Click local -> Topics
- Select
topic.DeduplicatedOdeMapJson
- You should now see only one copy of the map message sent.
- Go to