diff --git a/crates/arroyo-connectors/src/rabbitmq/mod.rs b/crates/arroyo-connectors/src/rabbitmq/mod.rs index 2331b0547..ca1fbfe12 100644 --- a/crates/arroyo-connectors/src/rabbitmq/mod.rs +++ b/crates/arroyo-connectors/src/rabbitmq/mod.rs @@ -57,17 +57,33 @@ impl Connector for RabbitmqConnector { fn test( &self, _: &str, - _: Self::ProfileT, - _: Self::TableT, - _: Option<&arroyo_rpc::api_types::connections::ConnectionSchema>, + config: Self::ProfileT, + table: Self::TableT, + _schema: Option<&arroyo_rpc::api_types::connections::ConnectionSchema>, tx: tokio::sync::mpsc::Sender, ) { - // TODO tokio::task::spawn(async move { - let message = TestSourceMessage { - error: false, - done: true, - message: "Successfully validated connection".to_string(), + let message = match config.get_environment().await { + Ok(environment) => { + if let Err(e) = environment.consumer().build(&table.stream).await { + TestSourceMessage { + error: true, + done: true, + message: e.to_string(), + } + } else { + TestSourceMessage { + error: false, + done: true, + message: "Successfully validated connection".to_string(), + } + } + } + Err(e) => TestSourceMessage { + error: true, + done: true, + message: e.to_string(), + }, }; tx.send(message).await.unwrap(); }); @@ -227,7 +243,7 @@ impl Connector for RabbitmqConnector { } impl RabbitmqStreamConfig { - async fn get_environment(&mut self) -> anyhow::Result { + async fn get_environment(&self) -> anyhow::Result { let builder = Environment::builder() .host(&self.host.clone().unwrap_or("localhost".to_owned())) .username(&self.username.clone().unwrap_or("guest".to_owned()))