Skip to content

Commit

Permalink
update error handling when parsing recordbatch to rows
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Apr 25, 2024
1 parent 0190d85 commit 6ca4656
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
6 changes: 4 additions & 2 deletions server/src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
fill_null: query_request.send_null,
with_fields: query_request.fields,
}
.to_http();
.to_http()?;

let time = time.elapsed().as_secs_f64();

Expand Down Expand Up @@ -293,12 +293,14 @@ pub enum QueryError {
EventError(#[from] EventError),
#[error("Error: {0}")]
MalformedQuery(String),
#[error("Error: Failed to Parse Record Batch in to Json")]
JsonParse,
}

impl actix_web::ResponseError for QueryError {
fn status_code(&self) -> http::StatusCode {
match self {
QueryError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
QueryError::Execute(_) | QueryError::JsonParse => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::BAD_REQUEST,
}
}
Expand Down
10 changes: 5 additions & 5 deletions server/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@
*
*/

use crate::{handlers::http::query::QueryError, utils::arrow::record_batches_to_json};
use actix_web::{web, Responder};
use datafusion::arrow::record_batch::RecordBatch;
use itertools::Itertools;
use serde_json::{json, Value};

use crate::utils::arrow::record_batches_to_json;

pub struct QueryResponse {
pub records: Vec<RecordBatch>,
pub fields: Vec<String>,
Expand All @@ -31,10 +30,11 @@ pub struct QueryResponse {
}

impl QueryResponse {
pub fn to_http(&self) -> impl Responder {
pub fn to_http(&self) -> Result<impl Responder, QueryError> {
log::info!("{}", "Returning query results");
let records: Vec<&RecordBatch> = self.records.iter().collect();
let mut json_records = record_batches_to_json(&records);
let mut json_records =
record_batches_to_json(&records).map_err(|_| QueryError::JsonParse)?;
if self.fill_null {
for map in &mut json_records {
for field in &self.fields {
Expand All @@ -55,6 +55,6 @@ impl QueryResponse {
Value::Array(values)
};

web::Json(response)
Ok(web::Json(response))
}
}
8 changes: 5 additions & 3 deletions server/src/utils/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod batch_adapter;
pub mod merged_reader;
pub mod reverse_reader;

use anyhow::Result;
pub use batch_adapter::adapt_batch;
pub use merged_reader::MergedRecordReader;
use serde_json::{Map, Value};
Expand Down Expand Up @@ -63,19 +64,20 @@ pub fn replace_columns(
/// * `records` - The record batches to convert.
///
/// # Returns
/// * Result<Vec<Map<String, Value>>>
///
/// A vector of JSON objects representing the record batches.
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Vec<Map<String, Value>> {
pub fn record_batches_to_json(records: &[&RecordBatch]) -> Result<Vec<Map<String, Value>>> {
let buf = vec![];
let mut writer = arrow_json::ArrayWriter::new(buf);
writer.write_batches(records).unwrap();
writer.finish().unwrap();

let buf = writer.into_inner();

let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(buf.as_slice()).unwrap();
let json_rows: Vec<Map<String, Value>> = serde_json::from_reader(buf.as_slice())?;

json_rows
Ok(json_rows)
}

/// Retrieves a field from a slice of fields by name.
Expand Down

0 comments on commit 6ca4656

Please sign in to comment.