Skip to content

Commit

Permalink
Add Kafka cluster setup to docker-compose files and refactor Dockerfi…
Browse files Browse the repository at this point in the history
…le for rdkafka dependencies. Implement retrying for consumer.rcv() fn to handle temporary Kafka unavailability.
  • Loading branch information
hippalus committed Dec 22, 2024
1 parent 58cc468 commit aff48a2
Show file tree
Hide file tree
Showing 15 changed files with 767 additions and 182 deletions.
162 changes: 103 additions & 59 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ mime = "0.3.17"
rdkafka = { version = "0.37", features = ["cmake-build", "tracing", "sasl", "ssl", "libz-static"] }
testcontainers = "0.23"
testcontainers-modules = { version = "0.11", features = ["kafka"] }
backon = "1.3"

### other dependencies
anyhow = { version = "1.0", features = ["backtrace"] }
Expand Down Expand Up @@ -71,7 +72,7 @@ num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
regex = "1.7.3"
regex = "1.11.1"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
"rustls-tls",
Expand All @@ -83,8 +84,8 @@ semver = "1.0"
serde = { version = "1.0", features = ["rc", "derive"] }
serde_json = "1.0"
static-files = "0.2"
sysinfo = "0.31.4"
thiserror = "1.0.64"
sysinfo = "0.33.0"
thiserror = "2.0.9"
thread-priority = "1.0.0"
tokio = { version = "1.42", default-features = false, features = [
"sync",
Expand Down
20 changes: 19 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ LABEL maintainer="Parseable Team <[email protected]>"
LABEL org.opencontainers.image.vendor="Parseable Inc"
LABEL org.opencontainers.image.licenses="AGPL-3.0"

RUN apt-get update && \
apt-get install --no-install-recommends -y \
cmake \
librdkafka-dev \
ca-certificates \
libsasl2-dev \
libssl-dev && \
rm -rf /var/lib/apt/lists/*

WORKDIR /parseable
COPY . .
RUN cargo build --release
Expand All @@ -30,7 +39,16 @@ FROM gcr.io/distroless/cc-debian12:latest

WORKDIR /parseable

# Copy the static shell into base image.
# Copy the Parseable binary from builder
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable

# Copy only the libraries that binary needs since kafka is statically linked
COPY --from=builder /usr/lib/x86_64-linux-gnu/libsasl2.so.2 /usr/lib/x86_64-linux-gnu/
COPY --from=builder /usr/lib/x86_64-linux-gnu/libssl.so.3 /usr/lib/x86_64-linux-gnu/
COPY --from=builder /usr/lib/x86_64-linux-gnu/libcrypto.so.3 /usr/lib/x86_64-linux-gnu/

# Copy CA certificates
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/


CMD ["/usr/bin/parseable"]
161 changes: 154 additions & 7 deletions docker-compose-distributed-test.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
version: "3.7"
networks:
parseable-internal:

services:
# minio
minio:
Expand All @@ -18,7 +18,7 @@ services:
ports:
- 9000:9000
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
interval: 15s
timeout: 20s
retries: 5
Expand All @@ -29,9 +29,10 @@ services:
build:
context: .
dockerfile: Dockerfile
command: ["parseable", "s3-store"]
platform: linux/amd64
command: [ "parseable", "s3-store" ]
ports:
- 8000:8000
- "8000:8000"
environment:
- P_S3_URL=http://minio:9000
- P_S3_ACCESS_KEY=parseable
Expand All @@ -47,7 +48,7 @@ services:
networks:
- parseable-internal
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"]
test: [ "CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness" ]
interval: 15s
timeout: 20s
retries: 5
Expand All @@ -63,7 +64,8 @@ services:
build:
context: .
dockerfile: Dockerfile
command: ["parseable", "s3-store"]
platform: linux/amd64
command: [ "parseable", "s3-store" ]
ports:
- 8000
environment:
Expand All @@ -79,16 +81,23 @@ services:
- P_PARQUET_COMPRESSION_ALGO=snappy
- P_MODE=ingest
- P_INGESTOR_ENDPOINT=parseable-ingest-one:8000
- P_KAFKA_TOPICS=dist-test-logs-stream
- P_KAFKA_BOOTSTRAP_SERVERS=kafka-0:9092,kafka-1:9092,kafka-2:9092
- P_KAFKA_GROUP_ID=parseable-kafka-sink-connector
# additional settings like security, tuning, etc.
networks:
- parseable-internal
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"]
test: [ "CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness" ]
interval: 15s
timeout: 20s
retries: 5
depends_on:
- parseable-query
- minio
- kafka-0
- kafka-1
- kafka-2
deploy:
restart_policy:
condition: on-failure
Expand Down Expand Up @@ -126,3 +135,141 @@ services:
condition: on-failure
delay: 20s
max_attempts: 3

kafka-0:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_0_data:/bitnami/kafka
networks:
- parseable-internal
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

kafka-1:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_1_data:/bitnami/kafka
networks:
- parseable-internal
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

kafka-2:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_2_data:/bitnami/kafka
networks:
- parseable-internal
healthcheck:
test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ]
interval: 10s
timeout: 5s
retries: 5

kafka-ui:
platform: linux/amd64
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
depends_on:
- kafka-0
- kafka-1
- kafka-2
environment:
KAFKA_CLUSTERS_0_NAME: dist-test
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092
KAFKA_CLUSTERS_0_METRICS_PORT: 9101
DYNAMIC_CONFIG_ENABLED: "true"
networks:
- parseable-internal
deploy:
restart_policy:
condition: on-failure
delay: 20s
max_attempts: 3

kafka-log-generator:
build:
context: ./scripts
dockerfile: Dockerfile
environment:
- KAFKA_BROKERS=kafka-0:9092,kafka-1:9092,kafka-2:9092
- KAFKA_TOPIC=dist-test-logs-stream
- LOG_RATE=500
- TOTAL_LOGS=100000
depends_on:
- kafka-0
- kafka-1
- kafka-2
networks:
- parseable-internal
restart: "no"

volumes:
kafka_0_data:
driver: local
kafka_1_data:
driver: local
kafka_2_data:
driver: local

41 changes: 41 additions & 0 deletions docker-compose-local.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
services:
kafka:
image: docker.io/bitnami/kafka:3.9
ports:
- "9092:9092"
- "29092:29092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners for internal and external communication
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:29092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT_INTERNAL

kafka-ui:
platform: linux/amd64
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9101
DYNAMIC_CONFIG_ENABLED: "true"
deploy:
restart_policy:
condition: on-failure
delay: 20s
max_attempts: 3

volumes:
kafka_data:
driver: local
Loading

0 comments on commit aff48a2

Please sign in to comment.