diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 9e38238b1..d878b7d5a 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -631,49 +631,6 @@ mod tests { ); } - #[test] - fn arr_obj_ignore_all_null_field() { - let json = json!([ - { - "a": 1, - "b": "hello", - "c": null - }, - { - "a": 1, - "b": "hello", - "c": null - }, - { - "a": 1, - "b": "hello", - "c": null - }, - ]); - - let req = TestRequest::default().to_http_request(); - - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - None, - ) - .unwrap(); - - assert_eq!(rb.num_rows(), 3); - assert_eq!(rb.num_columns(), 6); - assert_eq!( - rb.column_by_name("a").unwrap().as_int64_arr(), - &Int64Array::from(vec![Some(1), Some(1), Some(1)]) - ); - assert_eq!( - rb.column_by_name("b").unwrap().as_utf8_arr(), - &StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),]) - ); - } - #[test] fn arr_schema_mismatch() { let json = json!([ diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 49dcbc869..96e0766a9 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -105,7 +105,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result http::StatusCode { match self { - QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR, + QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::BAD_REQUEST, } } diff --git a/server/src/response.rs b/server/src/response.rs index 6d614bcba..0f5deb5ec 100644 --- a/server/src/response.rs +++ b/server/src/response.rs @@ -16,13 +16,12 @@ * */ +use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json}; use actix_web::{web, Responder}; use datafusion::arrow::record_batch::RecordBatch; use itertools::Itertools; use serde_json::{json, Value}; -use crate::utils::arrow::record_batches_to_json; - pub struct QueryResponse { pub records: Vec, pub fields: Vec, @@ -31,10 +30,11 @@ pub struct QueryResponse { } impl QueryResponse { - pub fn to_http(&self) -> impl Responder { + pub fn to_http(&self) -> Result { log::info!("{}", "Returning query results"); let records: Vec<&RecordBatch> = self.records.iter().collect(); - let mut json_records = record_batches_to_json(&records); + let mut json_records = record_batches_to_json(&records) + .map_err(|err| QueryError::JsonParse(err.to_string()))?; if self.fill_null { for map in &mut json_records { for field in &self.fields { @@ -55,6 +55,6 @@ impl QueryResponse { Value::Array(values) }; - web::Json(response) + Ok(web::Json(response)) } } diff --git a/server/src/utils/arrow.rs b/server/src/utils/arrow.rs index 5c982cd74..945b637fe 100644 --- a/server/src/utils/arrow.rs +++ b/server/src/utils/arrow.rs @@ -27,6 +27,7 @@ pub mod batch_adapter; pub mod merged_reader; pub mod reverse_reader; +use anyhow::Result; pub use batch_adapter::adapt_batch; pub use merged_reader::MergedRecordReader; use serde_json::{Map, Value}; @@ -63,19 +64,23 @@ pub fn replace_columns( /// * `records` - The record batches to convert. /// /// # Returns +/// * Result>> /// /// A vector of JSON objects representing the record batches. -pub fn record_batches_to_json(records: &[&RecordBatch]) -> Vec> { +pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result>> { let buf = vec![]; let mut writer = arrow_json::ArrayWriter::new(buf); - writer.write_batches(records).unwrap(); - writer.finish().unwrap(); + writer.write_batches(records)?; + writer.finish()?; let buf = writer.into_inner(); - let json_rows: Vec> = serde_json::from_reader(buf.as_slice()).unwrap(); + let json_rows: Vec> = match serde_json::from_reader(buf.as_slice()) { + Ok(json) => json, + Err(_) => vec![], + }; - json_rows + Ok(json_rows) } /// Retrieves a field from a slice of fields by name. @@ -105,7 +110,7 @@ mod tests { use arrow_array::{Array, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; - use super::replace_columns; + use super::{record_batches_to_json, replace_columns}; #[test] fn check_replace() { @@ -135,4 +140,12 @@ mod tests { assert_eq!(new_rb.num_columns(), 3); assert_eq!(new_rb.num_rows(), 3) } + + #[test] + fn check_empty_json_to_record_batches() { + let r = RecordBatch::new_empty(Arc::new(Schema::empty())); + let rb = vec![&r]; + let batches = record_batches_to_json(&rb).unwrap(); + assert_eq!(batches, vec![]); + } }