From 28064313f6a4537ca2b0d3dedb616baee5bb38f9 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Fri, 19 Apr 2024 12:44:54 +0530 Subject: [PATCH] impl arrow flight protocol for querying --- server/src/cli.rs | 28 +- server/src/handlers.rs | 1 + server/src/handlers/airplane.rs | 303 ++++++++++++++++++ .../src/handlers/http/modal/ingest_server.rs | 4 + .../src/handlers/http/modal/query_server.rs | 3 + server/src/handlers/http/modal/server.rs | 1 + server/src/handlers/http/query.rs | 2 +- server/src/main.rs | 6 +- 8 files changed, 338 insertions(+), 10 deletions(-) create mode 100644 server/src/handlers/airplane.rs diff --git a/server/src/cli.rs b/server/src/cli.rs index 2ad9899cd..cd3f8cf7a 100644 --- a/server/src/cli.rs +++ b/server/src/cli.rs @@ -89,6 +89,9 @@ pub struct Cli { /// public address for the parseable server ingestor pub ingestor_endpoint: String, + + /// port use by airplane(flight query service) + pub flight_port: u16, } impl Cli { @@ -118,6 +121,7 @@ impl Cli { pub const INGESTOR_ENDPOINT: &'static str = "ingestor-endpoint"; pub const DEFAULT_USERNAME: &'static str = "admin"; pub const DEFAULT_PASSWORD: &'static str = "admin"; + pub const FLIGHT_PORT: &'static str = "flight-port"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -275,6 +279,16 @@ impl Cli { .value_parser(value_parser!(u16)) .help("Port for gRPC server"), ) + .arg( + Arg::new(Self::FLIGHT_PORT) + .long(Self::FLIGHT_PORT) + .env("P_FLIGHT_PORT") + .value_name("PORT") + .default_value("8002") + .required(false) + .value_parser(value_parser!(u16)) + .help("Port for Arrow Flight Querying Engine"), + ) .arg( Arg::new(Self::LIVETAIL_CAPACITY) .long(Self::LIVETAIL_CAPACITY) @@ -317,11 +331,11 @@ impl Cli { .help("Mode of operation"), ) .arg( - Arg::new(Self::INGESTOR_ENDPOINT) - .long(Self::INGESTOR_ENDPOINT) - .env("P_INGESTOR_ENDPOINT") - .value_name("URL") - .required(false) + Arg::new(Self::INGESTOR_ENDPOINT) + .long(Self::INGESTOR_ENDPOINT) + .env("P_INGESTOR_ENDPOINT") + .value_name("URL") + .required(false) .help("URL to connect to this specific ingestor. Default is the address of the server.") ) .arg( @@ -401,6 +415,10 @@ impl FromArgMatches for Cli { .get_one::(Self::GRPC_PORT) .cloned() .expect("default for livetail port"); + self.flight_port = m + .get_one::(Self::FLIGHT_PORT) + .cloned() + .expect("default for flight port"); self.livetail_channel_capacity = m .get_one::(Self::LIVETAIL_CAPACITY) .cloned() diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 57e4aebcb..821e0651a 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -16,6 +16,7 @@ * */ +pub mod airplane; pub mod http; pub mod livetail; diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs new file mode 100644 index 000000000..a08062fa5 --- /dev/null +++ b/server/src/handlers/airplane.rs @@ -0,0 +1,303 @@ +use arrow_flight::flight_service_server::FlightServiceServer; +use arrow_schema::ArrowError; +use datafusion::common::tree_node::TreeNode; +use std::net::SocketAddr; +use std::sync::Arc; + +use futures_util::{Future, TryFutureExt}; + +use tonic::transport::{Identity, Server, ServerTlsConfig}; +use tonic_web::GrpcWebLayer; + +use crate::event::commit_schema; +use crate::handlers::http::fetch_schema; +use crate::option::{Mode, CONFIG}; + +use crate::handlers::livetail::cross_origin_config; + +use crate::handlers::http::query::{into_query, Query as QueryJson}; +use crate::query::{TableScanVisitor, QUERY_SESSION}; +use crate::rbac::role::Permission; +use crate::storage::object_storage::commit_schema_to_storage; +use arrow_ipc::writer::{DictionaryTracker, IpcDataGenerator, IpcWriteOptions}; +use futures::stream::BoxStream; + +use tonic::{Request, Response, Status, Streaming}; + +use arrow_flight::{ + flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, + FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, + SchemaResult, Ticket, +}; + +use crate::handlers::livetail::extract_session_key; + +use crate::metadata::STREAM_INFO; + +use crate::rbac::role::Action as RoleAction; +use crate::rbac::Users; + +#[derive(Clone)] +pub struct AirServiceImpl {} + +#[tonic::async_trait] +impl FlightService for AirServiceImpl { + type HandshakeStream = BoxStream<'static, Result>; + type ListFlightsStream = BoxStream<'static, Result>; + type DoGetStream = BoxStream<'static, Result>; + type DoPutStream = BoxStream<'static, Result>; + type DoActionStream = BoxStream<'static, Result>; + type ListActionsStream = BoxStream<'static, Result>; + type DoExchangeStream = BoxStream<'static, Result>; + + async fn handshake( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "handshake is disabled in favour of direct authentication and authorization", + )) + } + + /// list_flights is an operation that allows a client + /// to query a Flight server for information + /// about available datasets or "flights" that the server can provide. + async fn list_flights( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement list_flights")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Implement get_flight_info")) + } + + async fn get_schema( + &self, + request: Request, + ) -> Result, Status> { + let table_name = request.into_inner().path; + let table_name = table_name[0].clone(); + + let schema = STREAM_INFO + .schema(&table_name) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + + let options = IpcWriteOptions::default(); + let schema_result = SchemaAsIpc::new(&schema, &options) + .try_into() + .map_err(|err: ArrowError| Status::internal(err.to_string()))?; + + Ok(Response::new(schema_result)) + } + + async fn do_get(&self, req: Request) -> Result, Status> { + let key = extract_session_key(req.metadata())?; + let ticket = serde_json::from_slice::(&req.into_inner().ticket) + .map_err(|err| Status::internal(err.to_string()))?; + log::info!("airplane requested for query {:?}", ticket); + + // get the query session_state + let session_state = QUERY_SESSION.state(); + + // get the logical plan and extract the table name + let raw_logical_plan = session_state + .create_logical_plan(&ticket.query) + .await + .map_err(|err| { + log::error!("Failed to create logical plan: {}", err); + Status::internal("Failed to create logical plan") + })?; + + // create a visitor to extract the table name + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + + let table_name = visitor + .into_inner() + .pop() + .ok_or(Status::invalid_argument("No table found from sql"))?; + + if CONFIG.parseable.mode == Mode::Query { + // using http to get the schema. may update to use flight later + if let Ok(new_schema) = fetch_schema(&table_name).await { + // commit schema merges the schema internally and updates the schema in storage. + commit_schema_to_storage(&table_name, new_schema.clone()) + .await + .map_err(|err| Status::internal(err.to_string()))?; + commit_schema(&table_name, Arc::new(new_schema)) + .map_err(|err| Status::internal(err.to_string()))?; + } + } + + // map payload to query + let mut query = into_query(&ticket, &session_state) + .await + .map_err(|_| Status::internal("Failed to parse query"))?; + + // if table name is not present it is a Malformed Query + let stream_name = query + .table_name() + .ok_or(Status::invalid_argument("Malformed Query"))?; + + let permissions = Users.get_permissions(&key); + + let table_name = query.table_name(); + if let Some(ref table) = table_name { + let mut authorized = false; + let mut tags = Vec::new(); + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions { + match permission { + Permission::Stream(RoleAction::All, _) => { + authorized = true; + break; + } + Permission::StreamWithTag(RoleAction::Query, ref stream, tag) + if stream == table || stream == "*" => + { + authorized = true; + if let Some(tag) = tag { + tags.push(tag) + } + } + _ => (), + } + } + + if !authorized { + return Err(Status::permission_denied("User Not Authorized")); + } + + if !tags.is_empty() { + query.filter_tag = Some(tags) + } + } + + let (results, _) = query + .execute(table_name.clone().unwrap()) + .await + .map_err(|err| Status::internal(err.to_string()))?; + let schema = STREAM_INFO + .schema(&stream_name) + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); + let schema_flight_data = SchemaAsIpc::new(&schema, &options); + + let mut flights = vec![FlightData::from(schema_flight_data)]; + let encoder = IpcDataGenerator::default(); + let mut tracker = DictionaryTracker::new(false); + for batch in &results { + let (flight_dictionaries, flight_batch) = encoder + .encoded_batch(batch, &mut tracker, &options) + .map_err(|e| Status::internal(e.to_string()))?; + flights.extend(flight_dictionaries.into_iter().map(Into::into)); + flights.push(flight_batch.into()); + } + let output = futures::stream::iter(flights.into_iter().map(Ok)); + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_put not implemented because we are only using flight for querying", + )) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_action not implemented because we are only using flight for querying", + )) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented( + "list_actions not implemented because we are only using flight for querying", + )) + } + + async fn do_exchange( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented( + "do_exchange not implemented because we are only using flight for querying", + )) + } +} + +pub fn server() -> impl Future>> + Send { + let mut addr: SocketAddr = CONFIG + .parseable + .address + .parse() + .expect("valid socket address"); + addr.set_port(CONFIG.parseable.flight_port); + + let service = AirServiceImpl {}; + + let svc = FlightServiceServer::new(service); + + let cors = cross_origin_config(); + + let identity = match ( + &CONFIG.parseable.tls_cert_path, + &CONFIG.parseable.tls_key_path, + ) { + (Some(cert), Some(key)) => { + match (std::fs::read_to_string(cert), std::fs::read_to_string(key)) { + (Ok(cert_file), Ok(key_file)) => { + let identity = Identity::from_pem(cert_file, key_file); + Some(identity) + } + _ => None, + } + } + (_, _) => None, + }; + + let config = identity.map(|id| ServerTlsConfig::new().identity(id)); + + // rust is treating closures as different types + let err_map_fn = |err| Box::new(err) as Box; + + // match on config to decide if we want to use tls or not + match config { + Some(config) => { + let server = match Server::builder().tls_config(config) { + Ok(server) => server, + Err(_) => Server::builder(), + }; + + server + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(svc) + .serve(addr) + .map_err(err_map_fn) + } + None => Server::builder() + .accept_http1(true) + .layer(cors) + .layer(GrpcWebLayer::new()) + .add_service(svc) + .serve(addr) + .map_err(err_map_fn), + } +} diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 5e94d6cb2..1b40eee38 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -18,6 +18,7 @@ use crate::analytics; use crate::banner; +use crate::handlers::airplane; use crate::handlers::http::logstream; use crate::handlers::http::middleware::RouteExt; use crate::localcache::LocalCacheManager; @@ -336,7 +337,10 @@ impl IngestServer { let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = sync::object_store_sync(); + tokio::spawn(airplane::server()); + let app = self.start(prometheus, CONFIG.parseable.openid.clone()); + tokio::pin!(app); loop { tokio::select! { diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index ee258c0d3..33a930807 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::handlers::airplane; use crate::handlers::http::cluster; use crate::handlers::http::middleware::RouteExt; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; @@ -183,6 +184,8 @@ impl QueryServer { analytics::init_analytics_scheduler()?; } + tokio::spawn(airplane::server()); + self.start(prometheus, CONFIG.parseable.openid.clone()) .await?; diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index d7508f5bc..8049bbc6c 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -447,6 +447,7 @@ impl Server { } tokio::spawn(handlers::livetail::server()); + tokio::spawn(handlers::airplane::server()); let app = self.start(prometheus, CONFIG.parseable.openid.clone()); diff --git a/server/src/handlers/http/query.rs b/server/src/handlers/http/query.rs index 680e7b326..a5e7f1ba4 100644 --- a/server/src/handlers/http/query.rs +++ b/server/src/handlers/http/query.rs @@ -50,7 +50,7 @@ use crate::utils::actix::extract_session_key_from_req; #[derive(Debug, serde::Deserialize, serde::Serialize)] #[serde(rename_all = "camelCase")] pub struct Query { - query: String, + pub query: String, start_time: String, end_time: String, #[serde(default)] diff --git a/server/src/main.rs b/server/src/main.rs index 58f0a1c1c..95cbcb919 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -46,10 +46,8 @@ use std::sync::Arc; use handlers::http::modal::ParseableServer; use option::{Mode, CONFIG}; -use crate::{ - handlers::http::modal::{ - ingest_server::IngestServer, query_server::QueryServer, server::Server, - }, +use crate::handlers::http::modal::{ + ingest_server::IngestServer, query_server::QueryServer, server::Server, }; pub const STORAGE_UPLOAD_INTERVAL: u32 = 60;