Skip to content

Commit

Permalink
update error handling when parsing recordbatch to rows (#783)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight authored Apr 25, 2024
1 parent 0190d85 commit b251e05
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 56 deletions.
43 changes: 0 additions & 43 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,49 +631,6 @@ mod tests {
);
}

#[test]
fn arr_obj_ignore_all_null_field() {
let json = json!([
{
"a": 1,
"b": "hello",
"c": null
},
{
"a": 1,
"b": "hello",
"c": null
},
{
"a": 1,
"b": "hello",
"c": null
},
]);

let req = TestRequest::default().to_http_request();

let (_, rb, _) = into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
None,
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
assert_eq!(
rb.column_by_name("a").unwrap().as_int64_arr(),
&Int64Array::from(vec![Some(1), Some(1), Some(1)])
);
assert_eq!(
rb.column_by_name("b").unwrap().as_utf8_arr(),
&StringArray::from(vec![Some("hello"), Some("hello"), Some("hello"),])
);
}

#[test]
fn arr_schema_mismatch() {
let json = json!([
Expand Down
9 changes: 7 additions & 2 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http();
.to_http()?;

let time = time.elapsed().as_secs_f64();

Expand Down Expand Up @@ -293,12 +293,17 @@ pub enum QueryError {
EventError(#[from] EventError),
#[error("Error: {0}")]
MalformedQuery(String),
#[error(
r#"Error: Failed to Parse Record Batch into Json
Description: {0}"#
)]
JsonParse(String),
}

impl actix_web::ResponseError for QueryError {
fn status_code(&self) -> http::StatusCode {
match self {
QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
QueryError::Execute(_) | QueryError::JsonParse(_) => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::BAD_REQUEST,
}
}
Expand Down
10 changes: 5 additions & 5 deletions server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*
*/

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use actix_web::{web, Responder};
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};

use crate::utils::arrow::record_batches_to_json;

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
Expand All @@ -31,10 +30,11 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub fn to_http(&self) -> impl Responder {
pub fn to_http(&self) -> Result<impl Responder, QueryError> {
log::info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json(&records);
let mut json_records = record_batches_to_json(&records)
.map_err(|err| QueryError::JsonParse(err.to_string()))?;
if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
Expand All @@ -55,6 +55,6 @@ impl QueryResponse {
Value::Array(values)
};

web::Json(response)
Ok(web::Json(response))
}
}
25 changes: 19 additions & 6 deletions server/src/utils/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod batch_adapter;
pub mod merged_reader;
pub mod reverse_reader;

use anyhow::Result;
pub use batch_adapter::adapt_batch;
pub use merged_reader::MergedRecordReader;
use serde_json::{Map, Value};
Expand Down Expand Up @@ -63,19 +64,23 @@ pub fn replace_columns(
/// * `records` - The record batches to convert.
///
/// # Returns
/// * Result<Vec<Map<String, Value>>>
///
/// A vector of JSON objects representing the record batches.
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Vec<Map<String, Value>> {
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result<Vec<Map<String, Value>>> {
let buf = vec![];
let mut writer = arrow_json::ArrayWriter::new(buf);
writer.write_batches(records).unwrap();
writer.finish().unwrap();
writer.write_batches(records)?;
writer.finish()?;

let buf = writer.into_inner();

let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(buf.as_slice()).unwrap();
let json_rows: Vec<Map<String, Value>> = match serde_json::from_reader(buf.as_slice()) {
Ok(json) => json,
Err(_) => vec![],
};

json_rows
Ok(json_rows)
}

/// Retrieves a field from a slice of fields by name.
Expand Down Expand Up @@ -105,7 +110,7 @@ mod tests {
use arrow_array::{Array, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};

use super::replace_columns;
use super::{record_batches_to_json, replace_columns};

#[test]
fn check_replace() {
Expand Down Expand Up @@ -135,4 +140,12 @@ mod tests {
assert_eq!(new_rb.num_columns(), 3);
assert_eq!(new_rb.num_rows(), 3)
}

#[test]
fn check_empty_json_to_record_batches() {
let r = RecordBatch::new_empty(Arc::new(Schema::empty()));
let rb = vec![&r];
let batches = record_batches_to_json(&rb).unwrap();
assert_eq!(batches, vec![]);
}
}

0 comments on commit b251e05

Please sign in to comment.