Skip to content

Commit

Permalink
refactor: readability
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Dec 16, 2024
1 parent 5f9ec0c commit 497a86c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
38 changes: 18 additions & 20 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,29 +205,27 @@ async fn ingest_message<'a>(stream_name: &str, msg: BorrowedMessage<'a>) -> Resu
}

pub async fn setup_integration() {
tokio::task::spawn(async move {
let (consumer, stream_name) = match setup_consumer() {
Ok(c) => c,
Err(err) => {
match err {
KafkaError::DoNotPrintError => {
debug!("P_KAFKA_TOPIC not set, skipping kafka integration");
}
_ => {
error!("{err}");
}
let (consumer, stream_name) = match setup_consumer() {
Ok(c) => c,
Err(err) => {
match err {
KafkaError::DoNotPrintError => {
debug!("P_KAFKA_TOPIC not set, skipping kafka integration");
}
_ => {
error!("{err}");
}
return;
}
};
return;
}
};

info!("Setup kafka integration for {stream_name}");
let mut stream = consumer.stream();
info!("Setup kafka integration for {stream_name}");
let mut stream = consumer.stream();

while let Ok(curr) = stream.next().await.unwrap() {
if let Err(err) = ingest_message(&stream_name, curr).await {
error!("Unable to ingest incoming kafka message- {err}"),
}
while let Ok(curr) = stream.next().await.unwrap() {
if let Err(err) = ingest_message(&stream_name, curr).await {
error!("Unable to ingest incoming kafka message- {err}")
}
});
}
}
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ async fn main() -> anyhow::Result<()> {
metadata.set_global();

// load kafka server
if CONFIG.parseable.mode.ne(&Mode::Query) {
kafka::setup_integration().await;
if CONFIG.parseable.mode != Mode::Query {
tokio::task::spawn(kafka::setup_integration());
}

server.init().await?;
Expand Down

0 comments on commit 497a86c

Please sign in to comment.