Skip to content

Commit

Permalink
refactored: used serde
Browse files Browse the repository at this point in the history
  • Loading branch information
parmesant committed Dec 16, 2024
1 parent 05b9965 commit 0761663
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
7 changes: 5 additions & 2 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::path::PathBuf;
use url::Url;

use crate::{
kafka::SslProtocol,
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode},
};
Expand Down Expand Up @@ -125,7 +126,7 @@ pub struct Cli {
pub kafka_host: Option<String>,
pub kafka_group: Option<String>,
pub kafka_client_id: Option<String>,
pub kafka_security_protocol: Option<String>,
pub kafka_security_protocol: Option<SslProtocol>,
pub kafka_partitions: Option<String>,
}

Expand Down Expand Up @@ -582,7 +583,9 @@ impl FromArgMatches for Cli {
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
self.kafka_security_protocol = m.get_one::<String>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
self.kafka_security_protocol = m
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
.cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();

self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
Expand Down
9 changes: 5 additions & 4 deletions src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError};
use rdkafka::message::BorrowedMessage;
use rdkafka::util::Timeout;
use rdkafka::{Message, TopicPartitionList};
use serde::{Deserialize, Serialize};
use std::fmt::Display;
use std::num::ParseIntError;
use std::sync::Arc;
Expand All @@ -26,7 +27,8 @@ use crate::{
storage::StreamType,
};

enum SslProtocol {
#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
pub enum SslProtocol {
Plaintext,
Ssl,
SaslPlaintext,
Expand Down Expand Up @@ -150,9 +152,8 @@ fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> {
// conf.set("api.version.request", val.to_string());
// }

if let Some(val) = CONFIG.parseable.kafka_security_protocol.as_ref() {
let mapped: SslProtocol = val.parse()?;
conf.set("security.protocol", mapped.to_string());
if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() {
conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?);
}

let consumer: StreamConsumer = conf.create()?;
Expand Down

0 comments on commit 0761663

Please sign in to comment.