Skip to content

Commit

Permalink
Feature: Enable Kafka integration (#1021)
Browse files Browse the repository at this point in the history
Fixes #936

---------

Signed-off-by: parmesant <[email protected]>
Co-authored-by: Nitish Tiwari <[email protected]>
Co-authored-by: Devdutt Shenoi <[email protected]>
  • Loading branch information
3 people authored Dec 20, 2024
1 parent b277775 commit 4d3437c
Show file tree
Hide file tree
Showing 6 changed files with 400 additions and 1 deletion.
51 changes: 51 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ num_cpus = "1.15"
once_cell = "1.17.1"
prometheus = { version = "0.13", features = ["process"] }
rand = "0.8.5"
rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]}
regex = "1.7.3"
relative-path = { version = "1.7", features = ["serde"] }
reqwest = { version = "0.11.27", default-features = false, features = [
Expand Down
66 changes: 66 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::path::PathBuf;
use url::Url;

use crate::{
kafka::SslProtocol,
oidc::{self, OpenidConfig},
option::{validation, Compression, Mode},
};
Expand Down Expand Up @@ -107,6 +108,14 @@ pub struct Cli {
pub trino_auth: Option<String>,
pub trino_schema: Option<String>,
pub trino_catalog: Option<String>,

// Kafka specific env vars
pub kafka_topics: Option<String>,
pub kafka_host: Option<String>,
pub kafka_group: Option<String>,
pub kafka_client_id: Option<String>,
pub kafka_security_protocol: Option<SslProtocol>,
pub kafka_partitions: Option<String>,
}

impl Cli {
Expand Down Expand Up @@ -148,6 +157,14 @@ impl Cli {
pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization";
pub const TRINO_SCHEMA: &'static str = "p-trino-schema";

// Kafka specific env vars
pub const KAFKA_TOPICS: &'static str = "kafka-topics";
pub const KAFKA_HOST: &'static str = "kafka-host";
pub const KAFKA_GROUP: &'static str = "kafka-group";
pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id";
pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol";
pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions";

pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
self.local_staging_path.join(stream_name)
}
Expand All @@ -161,6 +178,48 @@ impl Cli {

pub fn create_cli_command_with_clap(name: &'static str) -> Command {
Command::new(name).next_line_help(false)
.arg(
Arg::new(Self::KAFKA_TOPICS)
.long(Self::KAFKA_TOPICS)
.env("P_KAFKA_TOPICS")
.value_name("STRING")
.help("Kafka topics to subscribe to"),
)
.arg(
Arg::new(Self::KAFKA_HOST)
.long(Self::KAFKA_HOST)
.env("P_KAFKA_HOST")
.value_name("STRING")
.help("Address and port for Kafka server"),
)
.arg(
Arg::new(Self::KAFKA_GROUP)
.long(Self::KAFKA_GROUP)
.env("P_KAFKA_GROUP")
.value_name("STRING")
.help("Kafka group"),
)
.arg(
Arg::new(Self::KAFKA_CLIENT_ID)
.long(Self::KAFKA_CLIENT_ID)
.env("P_KAFKA_CLIENT_ID")
.value_name("STRING")
.help("Kafka client id"),
)
.arg(
Arg::new(Self::KAFKA_SECURITY_PROTOCOL)
.long(Self::KAFKA_SECURITY_PROTOCOL)
.env("P_KAFKA_SECURITY_PROTOCOL")
.value_name("STRING")
.help("Kafka security protocol"),
)
.arg(
Arg::new(Self::KAFKA_PARTITIONS)
.long(Self::KAFKA_PARTITIONS)
.env("P_KAFKA_PARTITIONS")
.value_name("STRING")
.help("Kafka partitions"),
)
.arg(
Arg::new(Self::TRINO_ENDPOINT)
.long(Self::TRINO_ENDPOINT)
Expand Down Expand Up @@ -466,6 +525,13 @@ impl FromArgMatches for Cli {
self.trino_schema = m.get_one::<String>(Self::TRINO_SCHEMA).cloned();
self.trino_username = m.get_one::<String>(Self::TRINO_USER_NAME).cloned();

self.kafka_topics = m.get_one::<String>(Self::KAFKA_TOPICS).cloned();
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
self.kafka_security_protocol = m.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();

self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();
self.tls_key_path = m.get_one::<PathBuf>(Self::TLS_KEY).cloned();
self.trusted_ca_certs_path = m.get_one::<PathBuf>(Self::TRUSTED_CA_CERTS_PATH).cloned();
Expand Down
Loading

0 comments on commit 4d3437c

Please sign in to comment.