diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 959eb2ed0..ce4acaad0 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -90,6 +90,8 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result> { // send the query request to the ingester let mut res = vec![]; diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 62b7b6492..6df8620db 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -45,8 +45,6 @@ use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; -use super::send_query_request_to_ingester; - /// Query Request through http endpoint. #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] @@ -85,21 +83,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Option { if query.query.is_empty() { return None; @@ -288,8 +273,6 @@ pub enum QueryError { Datafusion(#[from] DataFusionError), #[error("Execution Error: {0}")] Execute(#[from] ExecuteError), - #[error("Error: {0}")] - Custom(String), #[error("ObjectStorage Error: {0}")] ObjectStorage(#[from] ObjectStorageError), #[error("Evern Error: {0}")] diff --git a/server/src/query.rs b/server/src/query.rs index c16c18f26..c3abe3d61 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -336,6 +336,8 @@ fn time_from_path(path: &Path) -> DateTime { .unwrap() } +/// unused for now might need it later +#[allow(unused)] pub fn flatten_objects_for_count(objects: Vec) -> Vec { if objects.is_empty() { return objects; diff --git a/server/src/response.rs b/server/src/response.rs index 6275864b5..c6731e3a1 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -22,8 +22,6 @@ use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; -use crate::query::flatten_objects_for_count; - pub struct QueryResponse { pub records: Vec, pub fields: Vec, @@ -32,7 +30,7 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self, imem: Option>) -> impl Responder { + pub fn to_http(&self) -> impl Responder { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); let mut json_records = record_batches_to_json_rows(&records).unwrap(); @@ -45,13 +43,7 @@ impl QueryResponse { } } } - let mut values = json_records.into_iter().map(Value::Object).collect_vec(); - - if let Some(mut imem) = imem { - values.append(&mut imem); - } - - let values = flatten_objects_for_count(values); + let values = json_records.into_iter().map(Value::Object).collect_vec(); let response = if self.with_fields { json!({ diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 1c87d0106..54c12c858 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -204,10 +204,29 @@ pub trait ObjectStorage: Sync + 'static { &self, stream_name: &str, ) -> Result { - let schema_path = - RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]); - let schema_map = self.get_object(&schema_path).await?; - Ok(serde_json::from_slice(&schema_map)?) + // try get my schema + // if fails get the base schema + // put the schema to storage?? + let schema_path = schema_path(stream_name); + let byte_data = match self.get_object(&schema_path).await { + Ok(bytes) => bytes, + Err(err) => { + log::info!("{:?}", err); + // base schema path + let schema_path = RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + SCHEMA_FILE_NAME, + ]); + let data = self.get_object(&schema_path).await?; + // schema was not found in store, so it needs to be placed + self.put_schema(stream_name, &serde_json::from_slice(&data).unwrap()) + .await?; + + data + } + }; + Ok(serde_json::from_slice(&byte_data)?) } async fn get_schema(&self, stream_name: &str) -> Result {