diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 26f29b592..f859b1a24 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -34,6 +34,7 @@ use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use crate::event::commit_schema; +use crate::metadata::STREAM_INFO; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::{Mode, CONFIG}; use crate::query::error::ExecuteError; @@ -63,27 +64,26 @@ pub struct Query { pub async fn query(req: HttpRequest, query_request: Query) -> Result { let session_state = QUERY_SESSION.state(); - // get the logical plan and extract the table name - let raw_logical_plan = session_state - .create_logical_plan(&query_request.query) - .await?; - // create a visitor to extract the table name - let mut visitor = TableScanVisitor::default(); - let _ = raw_logical_plan.visit(&mut visitor); - let table_name = visitor - .into_inner() - .pop() - .ok_or(QueryError::MalformedQuery( - "No table found from sql".to_string(), - ))?; - if CONFIG.parseable.mode == Mode::Query { - if let Ok(new_schema) = fetch_schema(&table_name).await { - // commit schema merges the schema internally and updates the schema in storage. - commit_schema_to_storage(&table_name, new_schema.clone()) - .await - .map_err(QueryError::ObjectStorage)?; - commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?; + for stream in STREAM_INFO.list_streams() { + // figure out how to update the schema of only required streams + match fetch_schema(&stream).await { + Ok(new_schema) => { + // commit schema merges the schema internally and updates the schema in storage. + commit_schema_to_storage(&stream, new_schema.clone()) + .await + .map_err(QueryError::ObjectStorage)?; + commit_schema(&stream, Arc::new(new_schema)).map_err(QueryError::EventError)?; + } + Err(err) => { + log::error!( + "Failed to fetch schema for stream: {}, Error: {}", + stream, + err + ); + continue; + } + } } }