Skip to content

Commit

Permalink
clean up 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Apr 29, 2024
1 parent 845e3e6 commit 02ab395
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
13 changes: 8 additions & 5 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use arrow_array::RecordBatch;
use arrow_flight::flight_service_server::FlightServiceServer;
use arrow_flight::PollInfo;
use arrow_schema::ArrowError;
use arrow_schema::{ArrowError, Schema};
use chrono::Utc;
use datafusion::common::tree_node::TreeNode;
use std::net::SocketAddr;
Expand Down Expand Up @@ -179,10 +179,6 @@ impl FlightService for AirServiceImpl {
Status::permission_denied("User Does not have permission to access this")
})?;

let schema = STREAM_INFO
.schema(&stream_name)
.map_err(|err| Status::failed_precondition(err.to_string()))?;

let (mut results, _) = query
.execute(stream_name)
.await
Expand All @@ -193,6 +189,13 @@ impl FlightService for AirServiceImpl {
results.append(&mut minute_result);
};

let schemas = results
.iter()
.map(|batch| batch.schema())
.map(|s| s.as_ref().clone())
.collect::<Vec<_>>();

let schema = Schema::try_merge(schemas).map_err(|err| Status::internal(err.to_string()))?;
let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default();
let schema_flight_data = SchemaAsIpc::new(&schema, &options);

Expand Down
7 changes: 4 additions & 3 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use clap::error::ErrorKind;
use clap::{command, Args, Command, FromArgMatches};

use core::fmt;
use once_cell::sync::Lazy;
use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel};
use std::env;
Expand Down Expand Up @@ -236,9 +237,9 @@ impl Mode {
}
}

impl ToString for Mode {
fn to_string(&self) -> String {
self.to_str().to_string()
impl fmt::Display for Mode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_str())
}
}

Expand Down
24 changes: 24 additions & 0 deletions server/src/utils/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,30 @@ pub use batch_adapter::adapt_batch;
pub use merged_reader::MergedRecordReader;
use serde_json::{Map, Value};

/// example function for concat recordbatch(may not work)
/// use arrow::record_batch::RecordBatch;
/// use arrow::error::Result;
///
/// fn concat_batches(batch1: RecordBatch, batch2: RecordBatch) -> Result<RecordBatch> {
/// let schema = batch1.schema();
/// let columns = schema
/// .fields()
/// .iter()
/// .enumerate()
/// .map(|(i, _)| -> Result<_> {
/// let array1 = batch1.column(i);
/// let array2 = batch2.column(i);
/// let array = arrow::compute::concat(&[array1.as_ref(), array2.as_ref()])?;
/// Ok(array)
/// })
/// .collect::<Result<Vec<_>>>()?;
///
/// RecordBatch::try_new(schema.clone(), columns)
/// }
///


/// Replaces columns in a record batch with new arrays.
///
/// # Arguments
Expand Down

0 comments on commit 02ab395

Please sign in to comment.