From aff48a202dd2c161bfc1abfcddebb0802baf2ba7 Mon Sep 17 00:00:00 2001 From: hippalus Date: Sun, 22 Dec 2024 10:21:15 +0100 Subject: [PATCH] Add Kafka cluster setup to docker-compose files and refactor Dockerfile for rdkafka dependencies. Implement retrying for consumer.rcv() fn to handle temporary Kafka unavailability. --- Cargo.lock | 162 ++++++++++------ Cargo.toml | 7 +- Dockerfile | 20 +- docker-compose-distributed-test.yaml | 161 +++++++++++++++- docker-compose-local.yaml | 41 ++++ docker-compose-test.yaml | 178 ++++++++++++++++-- scripts/Dockerfile | 28 +++ scripts/kafka_log_stream_generator.py | 161 +++++++++++----- src/connectors/kafka/config.rs | 10 +- src/connectors/kafka/consumer.rs | 164 ++++++++++++---- src/connectors/kafka/mod.rs | 2 +- ...on_stream_queue.rs => partition_stream.rs} | 0 src/connectors/kafka/processor.rs | 5 +- src/connectors/kafka/sink.rs | 8 +- src/connectors/kafka/state.rs | 2 +- 15 files changed, 767 insertions(+), 182 deletions(-) create mode 100644 docker-compose-local.yaml create mode 100644 scripts/Dockerfile rename src/connectors/kafka/{partition_stream_queue.rs => partition_stream.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 15ca371a8..feaacee8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -81,7 +81,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -218,7 +218,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -248,7 +248,7 @@ dependencies = [ "pin-project", "prometheus", "quanta", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -712,7 +712,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -723,7 +723,7 @@ checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -794,6 +794,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backon" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" +dependencies = [ + "fastrand 2.0.2", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -923,7 +934,7 @@ dependencies = [ "serde_json", "serde_repr", "serde_urlencoded", - "thiserror", + "thiserror 1.0.64", "tokio", "tokio-util", "tower-service", @@ -1040,7 +1051,7 @@ dependencies = [ "semver", "serde", "serde_json", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -1156,7 +1167,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -1265,9 +1276,9 @@ dependencies = [ [[package]] name = "core-foundation-sys" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06ea2b9bc92be3c2baa9334a323ebca2d6f074ff852cd1d7b11064035cd3868f" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" @@ -1395,7 +1406,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -1406,7 +1417,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -1836,7 +1847,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -1849,7 +1860,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -1871,7 +1882,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -2084,7 +2095,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -2158,6 +2169,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.26" @@ -2748,9 +2771,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.153" +version = "0.2.169" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" [[package]] name = "libm" @@ -3108,7 +3131,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -3179,7 +3202,7 @@ dependencies = [ "reqwest 0.12.8", "serde", "serde_json", - "thiserror", + "thiserror 1.0.64", "url", "validator", ] @@ -3314,7 +3337,7 @@ dependencies = [ "regex", "regex-syntax 0.8.5", "structmeta", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -3345,6 +3368,7 @@ dependencies = [ "arrow-schema", "arrow-select", "async-trait", + "backon", "base64 0.22.0", "byteorder", "bytes", @@ -3402,7 +3426,7 @@ dependencies = [ "sysinfo", "testcontainers", "testcontainers-modules", - "thiserror", + "thiserror 2.0.9", "thread-priority", "tokio", "tokio-stream", @@ -3531,7 +3555,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -3571,7 +3595,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5ac2cf0f2e4f42b49f5ffd07dae8d746508ef7526c13940e5f524012ae6c6550" dependencies = [ "proc-macro2", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -3609,9 +3633,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.86" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -3643,7 +3667,7 @@ dependencies = [ "parking_lot", "procfs", "protobuf", - "thiserror", + "thiserror 1.0.64", ] [[package]] @@ -3685,7 +3709,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.79", + "syn 2.0.91", "tempfile", ] @@ -3699,7 +3723,7 @@ dependencies = [ "itertools 0.13.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -3756,7 +3780,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.13", "socket2", - "thiserror", + "thiserror 1.0.64", "tokio", "tracing", ] @@ -3773,7 +3797,7 @@ dependencies = [ "rustc-hash", "rustls 0.23.13", "slab", - "thiserror", + "thiserror 1.0.64", "tinyvec", "tracing", ] @@ -3922,9 +3946,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", @@ -4107,7 +4131,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.79", + "syn 2.0.91", "unicode-ident", ] @@ -4385,7 +4409,7 @@ checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -4408,7 +4432,7 @@ checksum = "6c64451ba24fc7a6a2d60fc75dd9c83c90903b19028d4eff35e88fc1e86564e9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -4459,7 +4483,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -4590,7 +4614,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -4633,7 +4657,7 @@ checksum = "01b2e185515564f15375f593fb966b5718bc624ba77fe49fa4616ad619690554" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -4668,7 +4692,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -4679,7 +4703,7 @@ checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -4701,7 +4725,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -4722,9 +4746,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.91" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "d53cbcb5a243bd33b7858b1d7f4aca2153490815872d86d955d6ea29f743c035" dependencies = [ "proc-macro2", "quote", @@ -4748,9 +4772,9 @@ dependencies = [ [[package]] name = "sysinfo" -version = "0.31.4" +version = "0.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "355dbe4f8799b304b05e1b0f05fc59b2a18d36645cf169607da45bde2f69a1be" +checksum = "948512566b1895f93b1592c7574baeb2de842f224f2aab158799ecadb8ebbb46" dependencies = [ "core-foundation-sys", "libc", @@ -4814,7 +4838,7 @@ dependencies = [ "serde", "serde_json", "serde_with", - "thiserror", + "thiserror 1.0.64", "tokio", "tokio-stream", "tokio-tar", @@ -4837,7 +4861,16 @@ version = "1.0.64" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" dependencies = [ - "thiserror-impl", + "thiserror-impl 1.0.64", +] + +[[package]] +name = "thiserror" +version = "2.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" +dependencies = [ + "thiserror-impl 2.0.9", ] [[package]] @@ -4848,7 +4881,18 @@ checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.91", ] [[package]] @@ -4969,7 +5013,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -5214,7 +5258,7 @@ checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -5346,7 +5390,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f4e71ddbefed856d5881821d6ada4e606bbb91fd332296963ed596e2ad2100f3" dependencies = [ "libc", - "thiserror", + "thiserror 1.0.64", "windows 0.52.0", ] @@ -5421,7 +5465,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -5514,7 +5558,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", "wasm-bindgen-shared", ] @@ -5548,7 +5592,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5687,7 +5731,7 @@ checksum = "9107ddc059d5b6fbfbffdfa7a7fe3e22a226def0b2608f72e9d552763d3e1ad7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -5698,7 +5742,7 @@ checksum = "29bee4b38ea3cde66011baa44dba677c432a78593e202392d1e9070cf2a7fca7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -6016,7 +6060,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.91", ] [[package]] @@ -6038,7 +6082,7 @@ dependencies = [ "flate2", "indexmap 2.5.0", "memchr", - "thiserror", + "thiserror 1.0.64", "zopfli", ] diff --git a/Cargo.toml b/Cargo.toml index df676fb74..de979e891 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ mime = "0.3.17" rdkafka = { version = "0.37", features = ["cmake-build", "tracing", "sasl", "ssl", "libz-static"] } testcontainers = "0.23" testcontainers-modules = { version = "0.11", features = ["kafka"] } +backon = "1.3" ### other dependencies anyhow = { version = "1.0", features = ["backtrace"] } @@ -71,7 +72,7 @@ num_cpus = "1.15" once_cell = "1.17.1" prometheus = { version = "0.13", features = ["process"] } rand = "0.8.5" -regex = "1.7.3" +regex = "1.11.1" relative-path = { version = "1.7", features = ["serde"] } reqwest = { version = "0.11.27", default-features = false, features = [ "rustls-tls", @@ -83,8 +84,8 @@ semver = "1.0" serde = { version = "1.0", features = ["rc", "derive"] } serde_json = "1.0" static-files = "0.2" -sysinfo = "0.31.4" -thiserror = "1.0.64" +sysinfo = "0.33.0" +thiserror = "2.0.9" thread-priority = "1.0.0" tokio = { version = "1.42", default-features = false, features = [ "sync", diff --git a/Dockerfile b/Dockerfile index cfd88348b..ba0657b81 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,6 +21,15 @@ LABEL maintainer="Parseable Team " LABEL org.opencontainers.image.vendor="Parseable Inc" LABEL org.opencontainers.image.licenses="AGPL-3.0" +RUN apt-get update && \ + apt-get install --no-install-recommends -y \ + cmake \ + librdkafka-dev \ + ca-certificates \ + libsasl2-dev \ + libssl-dev && \ + rm -rf /var/lib/apt/lists/* + WORKDIR /parseable COPY . . RUN cargo build --release @@ -30,7 +39,16 @@ FROM gcr.io/distroless/cc-debian12:latest WORKDIR /parseable -# Copy the static shell into base image. +# Copy the Parseable binary from builder COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable +# Copy only the libraries that binary needs since kafka is statically linked +COPY --from=builder /usr/lib/x86_64-linux-gnu/libsasl2.so.2 /usr/lib/x86_64-linux-gnu/ +COPY --from=builder /usr/lib/x86_64-linux-gnu/libssl.so.3 /usr/lib/x86_64-linux-gnu/ +COPY --from=builder /usr/lib/x86_64-linux-gnu/libcrypto.so.3 /usr/lib/x86_64-linux-gnu/ + +# Copy CA certificates +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ + + CMD ["/usr/bin/parseable"] diff --git a/docker-compose-distributed-test.yaml b/docker-compose-distributed-test.yaml index 06cfd585b..147b9f848 100644 --- a/docker-compose-distributed-test.yaml +++ b/docker-compose-distributed-test.yaml @@ -1,6 +1,6 @@ -version: "3.7" networks: parseable-internal: + services: # minio minio: @@ -18,7 +18,7 @@ services: ports: - 9000:9000 healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ] interval: 15s timeout: 20s retries: 5 @@ -29,9 +29,10 @@ services: build: context: . dockerfile: Dockerfile - command: ["parseable", "s3-store"] + platform: linux/amd64 + command: [ "parseable", "s3-store" ] ports: - - 8000:8000 + - "8000:8000" environment: - P_S3_URL=http://minio:9000 - P_S3_ACCESS_KEY=parseable @@ -47,7 +48,7 @@ services: networks: - parseable-internal healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"] + test: [ "CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness" ] interval: 15s timeout: 20s retries: 5 @@ -63,7 +64,8 @@ services: build: context: . dockerfile: Dockerfile - command: ["parseable", "s3-store"] + platform: linux/amd64 + command: [ "parseable", "s3-store" ] ports: - 8000 environment: @@ -79,16 +81,23 @@ services: - P_PARQUET_COMPRESSION_ALGO=snappy - P_MODE=ingest - P_INGESTOR_ENDPOINT=parseable-ingest-one:8000 + - P_KAFKA_TOPICS=dist-test-logs-stream + - P_KAFKA_BOOTSTRAP_SERVERS=kafka-0:9092,kafka-1:9092,kafka-2:9092 + - P_KAFKA_GROUP_ID=parseable-kafka-sink-connector + # additional settings like security, tuning, etc. networks: - parseable-internal healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness"] + test: [ "CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness" ] interval: 15s timeout: 20s retries: 5 depends_on: - parseable-query - minio + - kafka-0 + - kafka-1 + - kafka-2 deploy: restart_policy: condition: on-failure @@ -126,3 +135,141 @@ services: condition: on-failure delay: 20s max_attempts: 3 + + kafka-0: + image: docker.io/bitnami/kafka:3.9 + ports: + - "9092" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 + - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + # Clustering + - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 + volumes: + - kafka_0_data:/bitnami/kafka + networks: + - parseable-internal + healthcheck: + test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ] + interval: 10s + timeout: 5s + retries: 5 + + kafka-1: + image: docker.io/bitnami/kafka:3.9 + ports: + - "9092" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=1 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 + - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + # Clustering + - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 + volumes: + - kafka_1_data:/bitnami/kafka + networks: + - parseable-internal + healthcheck: + test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ] + interval: 10s + timeout: 5s + retries: 5 + + kafka-2: + image: docker.io/bitnami/kafka:3.9 + ports: + - "9092" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=2 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 + - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + # Clustering + - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 + volumes: + - kafka_2_data:/bitnami/kafka + networks: + - parseable-internal + healthcheck: + test: [ "CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1" ] + interval: 10s + timeout: 5s + retries: 5 + + kafka-ui: + platform: linux/amd64 + image: provectuslabs/kafka-ui:latest + ports: + - "8080:8080" + depends_on: + - kafka-0 + - kafka-1 + - kafka-2 + environment: + KAFKA_CLUSTERS_0_NAME: dist-test + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092 + KAFKA_CLUSTERS_0_METRICS_PORT: 9101 + DYNAMIC_CONFIG_ENABLED: "true" + networks: + - parseable-internal + deploy: + restart_policy: + condition: on-failure + delay: 20s + max_attempts: 3 + + kafka-log-generator: + build: + context: ./scripts + dockerfile: Dockerfile + environment: + - KAFKA_BROKERS=kafka-0:9092,kafka-1:9092,kafka-2:9092 + - KAFKA_TOPIC=dist-test-logs-stream + - LOG_RATE=500 + - TOTAL_LOGS=100000 + depends_on: + - kafka-0 + - kafka-1 + - kafka-2 + networks: + - parseable-internal + restart: "no" + +volumes: + kafka_0_data: + driver: local + kafka_1_data: + driver: local + kafka_2_data: + driver: local + diff --git a/docker-compose-local.yaml b/docker-compose-local.yaml new file mode 100644 index 000000000..c44283dd3 --- /dev/null +++ b/docker-compose-local.yaml @@ -0,0 +1,41 @@ +services: + kafka: + image: docker.io/bitnami/kafka:3.9 + ports: + - "9092:9092" + - "29092:29092" + volumes: + - "kafka_data:/bitnami" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 + # Listeners for internal and external communication + - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,PLAINTEXT_INTERNAL://0.0.0.0:29092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT_INTERNAL + + kafka-ui: + platform: linux/amd64 + image: provectuslabs/kafka-ui:latest + ports: + - "8080:8080" + depends_on: + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_METRICS_PORT: 9101 + DYNAMIC_CONFIG_ENABLED: "true" + deploy: + restart_policy: + condition: on-failure + delay: 20s + max_attempts: 3 + +volumes: + kafka_data: + driver: local diff --git a/docker-compose-test.yaml b/docker-compose-test.yaml index 59b323c78..e34c867ab 100644 --- a/docker-compose-test.yaml +++ b/docker-compose-test.yaml @@ -1,5 +1,3 @@ -version: "3.7" - networks: parseable-internal: @@ -17,7 +15,7 @@ services: - MINIO_ROOT_PASSWORD=supersecret - MINIO_UPDATE=off ports: - - 9000 + - "9000:9000" healthcheck: test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ] interval: 15s @@ -30,9 +28,10 @@ services: build: context: . dockerfile: Dockerfile - command: ["parseable", "s3-store"] + platform: linux/amd64 + command: [ "parseable", "s3-store" ] ports: - - 8000 + - "8000:8000" environment: - P_S3_URL=http://minio:9000 - P_S3_ACCESS_KEY=parseable @@ -44,15 +43,22 @@ services: - P_PASSWORD=parseableadmin - P_CHECK_UPDATE=false - P_PARQUET_COMPRESSION_ALGO=snappy - networks: - - parseable-internal + - P_KAFKA_TOPICS=test-logs-stream + - P_KAFKA_BOOTSTRAP_SERVERS=kafka-0:9092,kafka-1:9092,kafka-2:9092 + - P_KAFKA_GROUP_ID=parseable-kafka-sink-connector + # additional settings like security, tuning, etc. + depends_on: + - minio + - kafka-0 + - kafka-1 + - kafka-2 healthcheck: test: [ "CMD", "curl", "-f", "http://localhost:8000/api/v1/liveness" ] interval: 15s timeout: 20s retries: 5 - depends_on: - - minio + networks: + - parseable-internal deploy: restart_policy: condition: on-failure @@ -61,13 +67,163 @@ services: quest: image: ghcr.io/parseablehq/quest:main - command: ["load", "http://parseable:8000", "parseableadmin", "parseableadmin", "20", "10", "5m", "minio:9000", "parseable", "supersecret", "parseable"] + platform: linux/amd64 + command: [ + "load", + "http://parseable:8000", + "parseableadmin", + "parseableadmin", + "20", + "10", + "5m", + "minio:9000", + "parseable", + "supersecret", + "parseable" + ] + depends_on: + - parseable + networks: + - parseable-internal + deploy: + restart_policy: + condition: on-failure + delay: 20s + max_attempts: 3 + + kafka-0: + image: docker.io/bitnami/kafka:3.9 + ports: + - "9092" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=0 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 + - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + # Clustering + - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 + volumes: + - kafka_0_data:/bitnami/kafka networks: - parseable-internal + healthcheck: + test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + + kafka-1: + image: docker.io/bitnami/kafka:3.9 + ports: + - "9092" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=1 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 + - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + # Clustering + - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 + volumes: + - kafka_1_data:/bitnami/kafka + networks: + - parseable-internal + healthcheck: + test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + + kafka-2: + image: docker.io/bitnami/kafka:3.9 + ports: + - "9092" + environment: + # KRaft settings + - KAFKA_CFG_NODE_ID=2 + - KAFKA_CFG_PROCESS_ROLES=controller,broker + - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 + - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv + # Listeners + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT + - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER + - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT + # Clustering + - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 + - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2 + volumes: + - kafka_2_data:/bitnami/kafka + networks: + - parseable-internal + healthcheck: + test: ["CMD-SHELL", "kafka-topics.sh --bootstrap-server localhost:9092 --list || exit 1"] + interval: 10s + timeout: 5s + retries: 5 + + kafka-ui: + platform: linux/amd64 + image: provectuslabs/kafka-ui:latest + ports: + - "8080:8080" depends_on: - - parseable + - kafka-0 + - kafka-1 + - kafka-2 + environment: + KAFKA_CLUSTERS_0_NAME: test + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka-0:9092,kafka-1:9092,kafka-2:9092 + KAFKA_CLUSTERS_0_METRICS_PORT: 9101 + DYNAMIC_CONFIG_ENABLED: "true" + networks: + - parseable-internal deploy: restart_policy: condition: on-failure delay: 20s max_attempts: 3 + + kafka-log-generator: + build: + context: ./scripts + dockerfile: Dockerfile + environment: + - KAFKA_BROKERS=kafka-0:9092,kafka-1:9092,kafka-2:9092 + - KAFKA_TOPIC=test-logs-stream + - LOG_RATE=500 + - TOTAL_LOGS=100000 + depends_on: + - kafka-0 + - kafka-1 + - kafka-2 + networks: + - parseable-internal + restart: "no" + +volumes: + kafka_0_data: + driver: local + kafka_1_data: + driver: local + kafka_2_data: + driver: local diff --git a/scripts/Dockerfile b/scripts/Dockerfile new file mode 100644 index 000000000..0ad2d62a1 --- /dev/null +++ b/scripts/Dockerfile @@ -0,0 +1,28 @@ +# Parseable Server (C) 2022 - 2024 Parseable, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +FROM python:3.13-slim-bookworm + +RUN apt-get update && apt-get install -y --no-install-recommends \ + gcc \ + librdkafka-dev \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install confluent-kafka + +WORKDIR /app +COPY kafka_log_stream_generator.py /app/ + +CMD ["python", "/app/kafka_log_stream_generator.py"] diff --git a/scripts/kafka_log_stream_generator.py b/scripts/kafka_log_stream_generator.py index 93eed25b3..4f1c487bb 100644 --- a/scripts/kafka_log_stream_generator.py +++ b/scripts/kafka_log_stream_generator.py @@ -1,64 +1,104 @@ -import json +import os +import sys import time +import json +import logging from datetime import datetime, timezone from random import choice, randint from uuid import uuid4 + from confluent_kafka import Producer +from confluent_kafka.admin import AdminClient +from confluent_kafka.cimpl import NewTopic + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout) # Log to stdout + ] +) + +logger = logging.getLogger(__name__) + +KAFKA_BROKERS = os.getenv("KAFKA_BROKERS", "localhost:9092") +KAFKA_TOPIC = os.getenv("KAFKA_TOPIC", "local-logs-stream") +NUM_PARTITIONS = int(os.getenv("NUM_PARTITIONS", "6")) # Default partitions +REPLICATION_FACTOR = int(os.getenv("REPLICATION_FACTOR", "3")) # Default RF +TOTAL_LOGS = int(os.getenv("TOTAL_LOGS", "100000")) # Total logs to produce +LOG_RATE = int(os.getenv("LOG_RATE", "500")) # Logs per second +REPORT_EVERY = 10_000 # Progress report frequency + +producer_conf = { + "bootstrap.servers": KAFKA_BROKERS, + "queue.buffering.max.messages": 200_000, + "queue.buffering.max.ms": 100, # Up to 100ms linger + "batch.num.messages": 10_000, + "compression.type": "lz4", # Compression (lz4, snappy, zstd, gzip) + "message.send.max.retries": 3, + "reconnect.backoff.ms": 100, + "reconnect.backoff.max.ms": 3600000, + # "acks": "all", # Safer but can reduce throughput if replication is slow +} -# Configuration -config = { - "kafka_broker": "localhost:9092", # Replace with your Kafka broker address - "kafka_topic": "log-stream", # Replace with your Kafka topic name - "log_rate": 500, # Logs per second - "log_template": { - "timestamp": "", # Timestamp will be added dynamically - "correlation_id": "", # Unique identifier for tracing requests - "level": "INFO", # Log level (e.g., INFO, ERROR, DEBUG) - "message": "", # Main log message to be dynamically set - "pod": { - "name": "example-pod", # Kubernetes pod name - "namespace": "default", # Kubernetes namespace - "node": "node-01" # Kubernetes node name - }, - "request": { - "method": "", # HTTP method - "path": "", # HTTP request path - "remote_address": "" # IP address of the client - }, - "response": { - "status_code": 200, # HTTP response status code - "latency_ms": 0 # Latency in milliseconds - }, - "metadata": { - "container_id": "", # Container ID - "image": "example/image:1.0", # Docker image - "environment": "prod" # Environment (e.g., dev, staging, prod) - } - } +admin_client = AdminClient({"bootstrap.servers": KAFKA_BROKERS}) +producer = Producer(producer_conf) + +LOG_TEMPLATE = { + "timestamp": "", + "correlation_id": "", + "level": "INFO", + "message": "", + "pod": {"name": "", "namespace": "", "node": ""}, + "request": {"method": "", "path": "", "remote_address": ""}, + "response": {"status_code": 200, "latency_ms": 0}, + "metadata": {"container_id": "", "image": "", "environment": ""}, } -producer = Producer({"bootstrap.servers": config["kafka_broker"]}) + +def create_topic(topic_name, num_partitions, replication_factor): + new_topic = NewTopic( + topic=topic_name, + num_partitions=num_partitions, + replication_factor=replication_factor + ) + + logger.info(f"Creating topic '{topic_name}' with {num_partitions} partitions and RF {replication_factor}...") + fs = admin_client.create_topics([new_topic]) + + for topic, f in fs.items(): + try: + f.result() + logger.info(f"Topic '{topic}' created successfully.") + except Exception as e: + if "TopicExistsError" in str(e): + logger.warning(f"Topic '{topic}' already exists.") + else: + logger.error(f"Failed to create topic '{topic}': {e}") def delivery_report(err, msg): - if err is not None: - print(f"Delivery failed for message {msg.key()}: {err}") + if err: + logger.error(f"Delivery failed for message {msg.key()}: {err}") else: - print(f"Message delivered to {msg.topic()} [{msg.partition()}]") + logger.debug(f"Message delivered to {msg.topic()} [{msg.partition()}]") def generate_log(): - log = config["log_template"].copy() + log = LOG_TEMPLATE.copy() + + # Timestamp & correlation log["timestamp"] = datetime.now(timezone.utc).isoformat() log["correlation_id"] = str(uuid4()) + # Random level/message levels = ["INFO", "WARNING", "ERROR", "DEBUG"] messages = [ "Received incoming HTTP request", "Processed request successfully", "Failed to process request", "Request timeout encountered", - "Service unavailable" + "Service unavailable", ] log["level"] = choice(levels) log["message"] = choice(messages) @@ -69,55 +109,72 @@ def generate_log(): log["request"] = { "method": choice(methods), "path": choice(paths), - "remote_address": f"192.168.1.{randint(1, 255)}" + "remote_address": f"192.168.1.{randint(1, 255)}", } # Populate response fields log["response"] = { "status_code": choice([200, 201, 400, 401, 403, 404, 500]), - "latency_ms": randint(10, 1000) + "latency_ms": randint(10, 1000), } # Populate pod and metadata fields log["pod"] = { "name": f"pod-{randint(1, 100)}", "namespace": choice(["default", "kube-system", "production", "staging"]), - "node": f"node-{randint(1, 10)}" + "node": f"node-{randint(1, 10)}", } log["metadata"] = { "container_id": f"container-{randint(1000, 9999)}", "image": f"example/image:{randint(1, 5)}.0", - "environment": choice(["dev", "staging", "prod"]) + "environment": choice(["dev", "staging", "prod"]), } return log def main(): + logger.info("Starting rate-limited log producer...") + create_topic(KAFKA_TOPIC, NUM_PARTITIONS, REPLICATION_FACTOR) + logger.info(f"Broker: {KAFKA_BROKERS}, Topic: {KAFKA_TOPIC}, Rate: {LOG_RATE} logs/sec, Total Logs: {TOTAL_LOGS}") + + start_time = time.time() + try: - while True: - # Generate log message - log_message = generate_log() - log_json = json.dumps(log_message) + for i in range(TOTAL_LOGS): + log_data = generate_log() + log_str = json.dumps(log_data) # Send to Kafka producer.produce( - config["kafka_topic"], - value=log_json, + topic=KAFKA_TOPIC, + value=log_str, callback=delivery_report ) - # Flush the producer to ensure delivery - producer.flush() + if (i + 1) % REPORT_EVERY == 0: + logger.info(f"{i + 1} messages produced. Flushing producer...") + producer.flush() + + # Sleep to maintain the logs/second rate + time.sleep(1 / LOG_RATE) - # Wait based on the log rate - time.sleep(1 / config["log_rate"]) except KeyboardInterrupt: - print("Stopped log generation.") + logger.warning("Interrupted by user! Flushing remaining messages...") + producer.flush() + + except Exception as e: + logger.error(f"An error occurred: {e}") + finally: + logger.info("Flushing producer...") producer.flush() + elapsed = time.time() - start_time + logger.info(f"DONE! Produced {TOTAL_LOGS} log messages in {elapsed:.2f} seconds.") + logger.info(f"Effective rate: ~{TOTAL_LOGS / elapsed:,.0f} logs/sec") + if __name__ == "__main__": main() diff --git a/src/connectors/kafka/config.rs b/src/connectors/kafka/config.rs index 5e79f32a2..a92cf71c6 100644 --- a/src/connectors/kafka/config.rs +++ b/src/connectors/kafka/config.rs @@ -261,7 +261,10 @@ impl KafkaConfig { pub fn consumer_config(&self) -> rdkafka::ClientConfig { let mut config = rdkafka::ClientConfig::new(); - config.set("bootstrap.servers", &self.bootstrap_servers); + config + .set("bootstrap.servers", &self.bootstrap_servers) + .set("reconnect.backoff.ms", "100") + .set("reconnect.backoff.max.ms", "3600000"); if let Some(client_id) = &self.client_id { config @@ -339,7 +342,10 @@ impl KafkaConfig { pub fn producer_config(&self) -> rdkafka::config::ClientConfig { let mut config = rdkafka::config::ClientConfig::new(); - config.set("bootstrap.servers", &self.bootstrap_servers); + config + .set("bootstrap.servers", &self.bootstrap_servers) + .set("reconnect.backoff.ms", "100") + .set("reconnect.backoff.max.ms", "3600000"); if let Some(client_id) = &self.client_id { config diff --git a/src/connectors/kafka/consumer.rs b/src/connectors/kafka/consumer.rs index bb443bb02..5c9ba3b7b 100644 --- a/src/connectors/kafka/consumer.rs +++ b/src/connectors/kafka/consumer.rs @@ -17,13 +17,16 @@ */ use crate::connectors::common::shutdown::Shutdown; -use crate::connectors::kafka::partition_stream_queue::PartitionStreamReceiver; +use crate::connectors::kafka::partition_stream::{PartitionStreamReceiver, PartitionStreamSender}; use crate::connectors::kafka::state::StreamState; use crate::connectors::kafka::{ - partition_stream_queue, ConsumerRecord, KafkaContext, StreamConsumer, TopicPartition, + partition_stream, ConsumerRecord, KafkaContext, StreamConsumer, TopicPartition, }; +use backon::{ExponentialBuilder, Retryable}; use futures_util::FutureExt; use rdkafka::consumer::Consumer; +use rdkafka::error::KafkaError; +use rdkafka::message::BorrowedMessage; use rdkafka::Statistics; use std::sync::Arc; use tokio::sync::{mpsc, RwLock}; @@ -81,10 +84,9 @@ impl KafkaStreams { /// 4. Listens for shutdown signals and gracefully terminates all partition streams, unsubscribing the consumer. /// /// Limitations and References: - /// - Issues with `split_partition_queue` in rust-rdkafka: + /// - Issues with `split_partition_queue` in rust-rdkafka: /// - https://github.com/fede1024/rust-rdkafka/issues/535 /// - https://github.com/confluentinc/librdkafka/issues/4059 - /// - https://github.com/fede1024/rust-rdkafka/issues/535 /// - https://github.com/confluentinc/librdkafka/issues/4059 /// - https://github.com/fede1024/rust-rdkafka/issues/654 /// - https://github.com/fede1024/rust-rdkafka/issues/651 @@ -106,51 +108,135 @@ impl KafkaStreams { std::thread::spawn(move || { tokio_handle.block_on(async move { + let retry_policy = ExponentialBuilder::default().with_max_times(5000); + loop { - tokio::select! { - result = consumer.recv() => { - match result { - Ok(msg) => { - let mut state = stream_state.write().await; - let tp = TopicPartition::from_kafka_msg(&msg); - let consumer_record = ConsumerRecord::from_borrowed_msg(msg); - let ps_tx = match state.get_partition_sender(&tp) { - Some(ps_tx) => ps_tx.clone(), - None => { - info!("Creating new stream for {:?}", tp); - let (ps_tx, ps_rx) = partition_stream_queue::bounded(10_000, tp.clone()); - state.insert_partition_sender(tp.clone(), ps_tx.clone()); - stream_tx.send(ps_rx).await.unwrap(); - ps_tx - } - }; - ps_tx.send(consumer_record).await; - } - Err(err) => { - error!("Cannot get message from kafka consumer! Cause {:?}", err); - break - }, - }; - }, - _ = shutdown_handle.recv() => { - info!("Gracefully stopping kafka partition streams!"); - let mut stream_state = stream_state.write().await; - stream_state.clear(); - consumer.unsubscribe(); - break; - }, - else => { - error!("KafkaStreams terminated!"); + let result = KafkaStreams::process_consumer_messages( + &consumer, + &stream_state, + &stream_tx, + &shutdown_handle, + &retry_policy, + ) + .await; + + match result { + Err(e) => { + error!( + "Partitioned processing encountered a critical error: {:?}", + e + ); break; } + Ok(..) => {} } } - }) + }); }); ReceiverStream::new(stream_rx) } + async fn process_consumer_messages( + consumer: &Arc, + stream_state: &RwLock, + stream_tx: &mpsc::Sender, + shutdown_handle: &Shutdown, + retry_policy: &ExponentialBuilder, + ) -> anyhow::Result<()> { + tokio::select! { + result = KafkaStreams::receive_with_retry(consumer, retry_policy) => match result { + Ok(msg) => KafkaStreams::handle_message(msg, stream_state, stream_tx).await, + Err(err) => { + anyhow::bail!("Unrecoverable error occurred while receiving Kafka message: {:?}", err); + }, + }, + _ = shutdown_handle.recv() => { + KafkaStreams::handle_shutdown(consumer, stream_state).await; + Ok(()) + }, + else => { + error!("KafkaStreams terminated unexpectedly!"); + Ok(()) + } + } + } + + async fn receive_with_retry<'a>( + consumer: &'a Arc, + retry_policy: &'a ExponentialBuilder, + ) -> Result, KafkaError> { + let recv_fn = || consumer.recv(); + + recv_fn + .retry(retry_policy.clone()) + .sleep(tokio::time::sleep) + .notify(|err, dur| { + tracing::warn!( + "Retrying message reception due to error: {:?}. Waiting for {:?}...", + err, + dur + ); + }) + .await + } + + /// Handle individual Kafka message and route it to the proper partition stream + async fn handle_message( + msg: BorrowedMessage<'_>, + stream_state: &RwLock, + stream_tx: &mpsc::Sender, + ) -> anyhow::Result<()> { + let mut state = stream_state.write().await; + let tp = TopicPartition::from_kafka_msg(&msg); + let consumer_record = ConsumerRecord::from_borrowed_msg(msg); + + let partition_stream_tx = + KafkaStreams::get_or_create_partition_stream(&mut state, stream_tx, tp).await; + partition_stream_tx.send(consumer_record).await; + + Ok(()) + } + + async fn get_or_create_partition_stream( + state: &mut StreamState, + stream_tx: &mpsc::Sender, + tp: TopicPartition, + ) -> PartitionStreamSender { + if let Some(ps_tx) = state.get_partition_sender(&tp) { + ps_tx.clone() + } else { + Self::create_new_partition_stream(state, stream_tx, tp).await + } + } + + async fn create_new_partition_stream( + state: &mut StreamState, + stream_tx: &mpsc::Sender, + tp: TopicPartition, + ) -> PartitionStreamSender { + info!("Creating new stream for {:?}", tp); + + let (ps_tx, ps_rx) = partition_stream::bounded(100_000, tp.clone()); + state.insert_partition_sender(tp.clone(), ps_tx.clone()); + + if let Err(e) = stream_tx.send(ps_rx).await { + error!( + "Failed to send partition stream receiver for {:?}: {:?}", + tp, e + ); + } + + ps_tx + } + + async fn handle_shutdown(consumer: &Arc, stream_state: &RwLock) { + info!("Gracefully stopping kafka partition streams!"); + let mut state = stream_state.write().await; + state.clear(); + consumer.unsubscribe(); + } + fn create_consumer(context: KafkaContext) -> Arc { info!("Creating Kafka consumer from configs {:#?}", context.config); diff --git a/src/connectors/kafka/mod.rs b/src/connectors/kafka/mod.rs index a7033ef84..5b8f82974 100644 --- a/src/connectors/kafka/mod.rs +++ b/src/connectors/kafka/mod.rs @@ -34,7 +34,7 @@ use tracing::{error, info, warn}; pub mod config; pub mod consumer; pub mod metrics; -mod partition_stream_queue; +mod partition_stream; pub mod processor; pub mod rebalance_listener; pub mod sink; diff --git a/src/connectors/kafka/partition_stream_queue.rs b/src/connectors/kafka/partition_stream.rs similarity index 100% rename from src/connectors/kafka/partition_stream_queue.rs rename to src/connectors/kafka/partition_stream.rs diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 9ec0ed9f3..4813e3c21 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -32,8 +32,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio_stream::wrappers::ReceiverStream; -use tracing::{debug, warn}; -use tracing::{error, info}; +use tracing::{debug, error, warn}; #[derive(Default, Debug, Clone)] pub struct ParseableSinkProcessor; @@ -138,7 +137,6 @@ where tp: TopicPartition, record_stream: ReceiverStream, ) -> anyhow::Result<()> { - info!("Started processing stream for {:?}", tp); let chunked_stream = tokio_stream::StreamExt::chunks_timeout( record_stream, self.buffer_size, @@ -164,7 +162,6 @@ where }) .await; - info!("Finished processing stream for {:?}", tp); self.processor.post_stream().await?; Ok(()) diff --git a/src/connectors/kafka/sink.rs b/src/connectors/kafka/sink.rs index e257c4abf..e2a117ee3 100644 --- a/src/connectors/kafka/sink.rs +++ b/src/connectors/kafka/sink.rs @@ -24,7 +24,7 @@ use anyhow::Result; use futures_util::StreamExt; use std::sync::Arc; use tokio::time::Duration; -use tracing::error; +use tracing::{error, info}; pub struct KafkaSinkConnector

where @@ -66,12 +66,16 @@ where tokio::spawn(async move { partition_queue .run_drain(|record_stream| async { + info!("Starting task for partition: {:?}", tp); + worker .process_partition(tp.clone(), record_stream) .await .unwrap(); }) - .await + .await; + + info!("Task completed for partition: {:?}", tp); }) }) .for_each_concurrent(None, |task| async { diff --git a/src/connectors/kafka/state.rs b/src/connectors/kafka/state.rs index cc91bc9d8..ca0904d4d 100644 --- a/src/connectors/kafka/state.rs +++ b/src/connectors/kafka/state.rs @@ -16,7 +16,7 @@ * */ -use crate::connectors::kafka::partition_stream_queue::PartitionStreamSender; +use crate::connectors::kafka::partition_stream::PartitionStreamSender; use crate::connectors::kafka::{TopicPartition, TopicPartitionList}; use std::collections::HashMap; use tracing::info;