From 497a86c62fd66032b149aac3fb7a1c734859b51d Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 16 Dec 2024 11:45:47 +0530 Subject: [PATCH] refactor: readability --- src/kafka.rs | 38 ++++++++++++++++++-------------------- src/main.rs | 4 ++-- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/src/kafka.rs b/src/kafka.rs index 07e7c9bf8..f4e6bf4da 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -205,29 +205,27 @@ async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Resu } pub async fn setup_integration() { - tokio::task::spawn(async move { - let (consumer, stream_name) = match setup_consumer() { - Ok(c) => c, - Err(err) => { - match err { - KafkaError::DoNotPrintError => { - debug!("P_KAFKA_TOPIC not set, skipping kafka integration"); - } - _ => { - error!("{err}"); - } + let (consumer, stream_name) = match setup_consumer() { + Ok(c) => c, + Err(err) => { + match err { + KafkaError::DoNotPrintError => { + debug!("P_KAFKA_TOPIC not set, skipping kafka integration"); + } + _ => { + error!("{err}"); } - return; } - }; + return; + } + }; - info!("Setup kafka integration for {stream_name}"); - let mut stream = consumer.stream(); + info!("Setup kafka integration for {stream_name}"); + let mut stream = consumer.stream(); - while let Ok(curr) = stream.next().await.unwrap() { - if let Err(err) = ingest_message(&stream_name, curr).await { - error!("Unable to ingest incoming kafka message- {err}"), - } + while let Ok(curr) = stream.next().await.unwrap() { + if let Err(err) = ingest_message(&stream_name, curr).await { + error!("Unable to ingest incoming kafka message- {err}") } - }); + } } diff --git a/src/main.rs b/src/main.rs index 641a2d7c8..d1663d539 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,8 +47,8 @@ async fn main() -> anyhow::Result<()> { metadata.set_global(); // load kafka server - if CONFIG.parseable.mode.ne(&Mode::Query) { - kafka::setup_integration().await; + if CONFIG.parseable.mode != Mode::Query { + tokio::task::spawn(kafka::setup_integration()); } server.init().await?;