Skip to content

Commit

Permalink
feat(metrics): add KafkaMetricsCollector for Prometheus integration .
Browse files Browse the repository at this point in the history
Implement KafkaMetricsCollector to collect and expose Kafka client and broker metrics. Refactor ParseableServer.init(..) and connectors::init(..).
  • Loading branch information
hippalus committed Dec 23, 2024
1 parent aff48a2 commit 9ce1031
Show file tree
Hide file tree
Showing 11 changed files with 714 additions and 373 deletions.
318 changes: 3 additions & 315 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ mime = "0.3.17"

### connectors dependencies
rdkafka = { version = "0.37", features = ["cmake-build", "tracing", "sasl", "ssl", "libz-static"] }
testcontainers = "0.23"
testcontainers-modules = { version = "0.11", features = ["kafka"] }
backon = "1.3"

### other dependencies
Expand Down
9 changes: 6 additions & 3 deletions src/connectors/kafka/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ impl Default for KafkaConfig {
// Common configuration with standard broker port
bootstrap_servers: "localhost:9092".to_string(),
topics: vec![],
client_id: None, // Let Kafka generate a unique client ID if not specified
client_id: Some("parseable-connect".to_string()),

// Component-specific configurations with production-ready defaults
consumer: Some(ConsumerConfig::default()),
Expand Down Expand Up @@ -613,7 +613,10 @@ mod tests {
);

let rdkafka_config = config.consumer_config();
assert_eq!(rdkafka_config.get("group.id"), Some("test-group"));
assert_eq!(
rdkafka_config.get("group.id"),
Some("parseable-test-group-gi")
);
assert_eq!(
rdkafka_config.get("partition.assignment.strategy"),
Some("cooperative-sticky")
Expand All @@ -631,7 +634,7 @@ mod tests {
if let Some(producer) = config.producer {
assert_eq!(producer.acks, "all");
assert!(producer.enable_idempotence);
assert_eq!(producer.compression_type, "snappy");
assert_eq!(producer.compression_type, "lz4");
}
}

Expand Down
19 changes: 8 additions & 11 deletions src/connectors/kafka/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ impl KafkaStreams {
shutdown_handle: Shutdown,
) -> anyhow::Result<KafkaStreams> {
info!("Initializing KafkaStreams...");
let statistics = Arc::clone(&context.statistics);
let consumer = KafkaStreams::create_consumer(context);
let statistics = Arc::new(std::sync::RwLock::new(Statistics::default()));
info!("KafkaStreams initialized successfully.");

Ok(Self {
Expand Down Expand Up @@ -120,15 +120,12 @@ impl KafkaStreams {
)
.await;

match result {
Err(e) => {
error!(
"Partitioned processing encountered a critical error: {:?}",
e
);
break;
}
Ok(..) => {}
if let Err(e) = result {
error!(
"Partitioned processing encountered a critical error: {:?}",
e
);
break;
}
}
});
Expand Down Expand Up @@ -169,7 +166,7 @@ impl KafkaStreams {
let recv_fn = || consumer.recv();

recv_fn
.retry(retry_policy.clone())
.retry(*retry_policy)
.sleep(tokio::time::sleep)
.notify(|err, dur| {
tracing::warn!(
Expand Down
Loading

0 comments on commit 9ce1031

Please sign in to comment.