Skip to content

Commit

Permalink
fix: schema update where querying
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Apr 22, 2024
1 parent 09b9e40 commit 55e5f07
Showing 1 changed file with 20 additions and 20 deletions.
40 changes: 20 additions & 20 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;

use crate::event::commit_schema;
use crate::metadata::STREAM_INFO;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
Expand Down Expand Up @@ -63,27 +64,26 @@ pub struct Query {
pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Responder, QueryError> {
let session_state = QUERY_SESSION.state();

// get the logical plan and extract the table name
let raw_logical_plan = session_state
.create_logical_plan(&query_request.query)
.await?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
let table_name = visitor
.into_inner()
.pop()
.ok_or(QueryError::MalformedQuery(
"No table found from sql".to_string(),
))?;

if CONFIG.parseable.mode == Mode::Query {
if let Ok(new_schema) = fetch_schema(&table_name).await {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&table_name, new_schema.clone())
.await
.map_err(QueryError::ObjectStorage)?;
commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?;
for stream in STREAM_INFO.list_streams() {
// figure out how to update the schema of only required streams
match fetch_schema(&stream).await {
Ok(new_schema) => {
// commit schema merges the schema internally and updates the schema in storage.
commit_schema_to_storage(&stream, new_schema.clone())
.await
.map_err(QueryError::ObjectStorage)?;
commit_schema(&stream, Arc::new(new_schema)).map_err(QueryError::EventError)?;
}
Err(err) => {
log::error!(
"Failed to fetch schema for stream: {}, Error: {}",
stream,
err
);
continue;
}
}
}
}

Expand Down

0 comments on commit 55e5f07

Please sign in to comment.