diff --git a/src/cli.rs b/src/cli.rs index 4a159308a..a5e03f3b0 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -22,6 +22,7 @@ use std::path::PathBuf; use url::Url; use crate::{ + kafka::SslProtocol, oidc::{self, OpenidConfig}, option::{validation, Compression, Mode}, }; @@ -125,7 +126,7 @@ pub struct Cli { pub kafka_host: Option, pub kafka_group: Option, pub kafka_client_id: Option, - pub kafka_security_protocol: Option, + pub kafka_security_protocol: Option, pub kafka_partitions: Option, } @@ -582,7 +583,9 @@ impl FromArgMatches for Cli { self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); - self.kafka_security_protocol = m.get_one::(Self::KAFKA_SECURITY_PROTOCOL).cloned(); + self.kafka_security_protocol = m + .get_one::(Self::KAFKA_SECURITY_PROTOCOL) + .cloned(); self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); self.local_cache_path = m.get_one::(Self::CACHE).cloned(); diff --git a/src/kafka.rs b/src/kafka.rs index eae86d851..9eafa754c 100644 --- a/src/kafka.rs +++ b/src/kafka.rs @@ -8,6 +8,7 @@ use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError}; use rdkafka::message::BorrowedMessage; use rdkafka::util::Timeout; use rdkafka::{Message, TopicPartitionList}; +use serde::{Deserialize, Serialize}; use std::fmt::Display; use std::num::ParseIntError; use std::sync::Arc; @@ -26,7 +27,8 @@ use crate::{ storage::StreamType, }; -enum SslProtocol { +#[derive(Debug, Deserialize, Serialize, Clone, Copy)] +pub enum SslProtocol { Plaintext, Ssl, SaslPlaintext, @@ -150,9 +152,8 @@ fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> { // conf.set("api.version.request", val.to_string()); // } - if let Some(val) = CONFIG.parseable.kafka_security_protocol.as_ref() { - let mapped: SslProtocol = val.parse()?; - conf.set("security.protocol", mapped.to_string()); + if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() { + conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?); } let consumer: StreamConsumer = conf.create()?;