Skip to content

Commit

Permalink
add live tail support for stream (#516)
Browse files Browse the repository at this point in the history
This PR adds support for live tailing a stream. With this PR,
server exposes a gRPC server on a configurable port (default 8001)
and uses Arrow Flight as the communication protocol. 

As live tail is requested for a stream, Server sends events from 
that steam, before the local staging and parquet conversion. 
This means, server will use the event record batch to create a
flight stream and send over to the client.

fixes #503
  • Loading branch information
trueleo authored Oct 13, 2023
1 parent 24be36f commit 7f4da55
Show file tree
Hide file tree
Showing 10 changed files with 767 additions and 42 deletions.
325 changes: 292 additions & 33 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ actix-web = { version = "4.3", features = ["rustls"] }
actix-cors = "0.6"
actix-web-prometheus = { version = "0.1" }
actix-web-static-files = "4.0"
mime = "0.3.17"

### LiveTail server deps
arrow-flight = "42.0.0"
tonic = "0.9.0"
tonic-web = "0.9.0"
tower-http = { version = "0.4.4", features = ["cors"] }

### other dependencies
anyhow = { version = "1.0", features = ["backtrace"] }
Expand Down Expand Up @@ -91,6 +98,9 @@ nom = "7.1.3"
humantime = "2.1.0"
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
url = "2.4.0"
http-auth-basic = "0.3.3"
cookies = "0.0.1"
cookie = "0.17.0"

[build-dependencies]
cargo_toml = "0.15"
Expand Down
2 changes: 1 addition & 1 deletion server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ fn build_metrics() -> HashMap<String, Value> {
metrics
}

pub async fn init_analytics_scheduler() {
pub fn init_analytics_scheduler() {
log::info!("Setting up schedular for anonymous user analytics");

let mut scheduler = AsyncScheduler::new();
Expand Down
2 changes: 2 additions & 0 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ impl Event {
num_rows,
)?;

crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);

if let Err(e) = metadata::STREAM_INFO
.check_alerts(&self.stream_name, self.rb)
.await
Expand Down
6 changes: 6 additions & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@
*/

pub mod http;
pub mod livetail;

const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const FILL_NULL_OPTION_KEY: &str = "send_null";
const SEPARATOR: char = '^';

const OIDC_SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
const USER_COOKIE_NAME: &str = "username";
11 changes: 4 additions & 7 deletions server/src/handlers/http/oidc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use ulid::Ulid;
use url::Url;

use crate::{
handlers::{COOKIE_AGE_DAYS, OIDC_SCOPE, SESSION_COOKIE_NAME, USER_COOKIE_NAME},
oidc::{Claims, DiscoveredClient},
option::CONFIG,
rbac::{
Expand All @@ -42,10 +43,6 @@ use crate::{
utils::actix::extract_session_key_from_req,
};

// fetch common personalization scope to determine username.
const SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;

/// Struct representing query params returned from oidc provider
#[derive(Deserialize, Debug)]
pub struct Login {
Expand Down Expand Up @@ -182,7 +179,7 @@ fn redirect_to_oidc(
) -> HttpResponse {
let redirect = query.into_inner().redirect.to_string();
let auth_url = oidc_client.auth_url(&Options {
scope: Some(SCOPE.into()),
scope: Some(OIDC_SCOPE.into()),
state: Some(redirect),
..Default::default()
});
Expand Down Expand Up @@ -222,7 +219,7 @@ fn redirect_no_oauth_setup(mut url: Url) -> HttpResponse {
}

fn cookie_session(id: Ulid) -> Cookie<'static> {
let authorization_cookie = Cookie::build("session", id.to_string())
let authorization_cookie = Cookie::build(SESSION_COOKIE_NAME, id.to_string())
.max_age(time::Duration::days(COOKIE_AGE_DAYS as i64))
.same_site(SameSite::Strict)
.path("/")
Expand All @@ -231,7 +228,7 @@ fn cookie_session(id: Ulid) -> Cookie<'static> {
}

fn cookie_username(username: &str) -> Cookie<'static> {
let authorization_cookie = Cookie::build("username", username.to_string())
let authorization_cookie = Cookie::build(USER_COOKIE_NAME, username.to_string())
.max_age(time::Duration::days(COOKIE_AGE_DAYS as i64))
.same_site(SameSite::Strict)
.path("/")
Expand Down
242 changes: 242 additions & 0 deletions server/src/handlers/livetail.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/*
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use std::net::SocketAddr;

use arrow_array::RecordBatch;
use arrow_flight::encode::FlightDataEncoderBuilder;
use cookie::Cookie;
use futures::stream::BoxStream;
use futures_util::{Future, StreamExt, TryFutureExt, TryStreamExt};
use http_auth_basic::Credentials;
use rand::distributions::{Alphanumeric, DistString};
use tonic::metadata::MetadataMap;
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};

use arrow_flight::{
flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action,
ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest,
HandshakeResponse, PutResult, SchemaResult, Ticket,
};
use tonic_web::GrpcWebLayer;
use tower_http::cors::{Any, CorsLayer};

use crate::livetail::{Message, LIVETAIL};
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::rbac::map::SessionKey;
use crate::rbac::{self, Users};
use crate::utils;

use super::SESSION_COOKIE_NAME;

#[derive(Clone)]
pub struct FlightServiceImpl {}

#[tonic::async_trait]
impl FlightService for FlightServiceImpl {
type HandshakeStream = BoxStream<'static, Result<HandshakeResponse, Status>>;
type ListFlightsStream = BoxStream<'static, Result<FlightInfo, Status>>;
type DoGetStream = BoxStream<'static, Result<FlightData, Status>>;
type DoPutStream = BoxStream<'static, Result<PutResult, Status>>;
type DoActionStream = BoxStream<'static, Result<arrow_flight::Result, Status>>;
type ListActionsStream = BoxStream<'static, Result<ActionType, Status>>;
type DoExchangeStream = BoxStream<'static, Result<FlightData, Status>>;

async fn handshake(
&self,
_request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, Status> {
Err(Status::unimplemented(
"handshake is disabled in favour of direct authentication and authorization",
))
}

async fn list_flights(
&self,
_request: Request<Criteria>,
) -> Result<Response<Self::ListFlightsStream>, Status> {
Err(Status::unimplemented("Implement list_flights"))
}

async fn get_flight_info(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<FlightInfo>, Status> {
Err(Status::unimplemented("Implement get_flight_info"))
}

async fn get_schema(
&self,
_request: Request<FlightDescriptor>,
) -> Result<Response<SchemaResult>, Status> {
Err(Status::unimplemented("Implement get_schema"))
}

async fn do_get(&self, req: Request<Ticket>) -> Result<Response<Self::DoGetStream>, Status> {
let key = extract_session_key(req.metadata())?;
let ticket: serde_json::Value = serde_json::from_slice(&req.into_inner().ticket)
.map_err(|err| Status::internal(err.to_string()))?;
let stream = extract_stream(&ticket)?;
log::info!("livetail requested for stream {}", stream);
match Users.authorize(key, rbac::role::Action::Query, Some(stream), None) {
rbac::Response::Authorized => (),
rbac::Response::UnAuthorized => {
return Err(Status::permission_denied(
"user is not authenticated to access this resource",
))
}
rbac::Response::ReloadRequired => {
return Err(Status::unauthenticated("reload required"))
}
}

let schema = STREAM_INFO
.schema(stream)
.map_err(|err| Status::failed_precondition(err.to_string()))?;

let rx = LIVETAIL.new_pipe(
Alphanumeric.sample_string(&mut rand::thread_rng(), 32),
stream.to_string(),
);

let adapter_schema = schema.clone();
let rx = rx.map(move |x| match x {
Message::Record(t) => Ok(utils::arrow::adapt_batch(&adapter_schema, &t)),
Message::Skipped(_) => {
log::warn!("livetail channel capacity is full.");
Ok(RecordBatch::new_empty(adapter_schema.clone()))
}
});

let rb_stream = FlightDataEncoderBuilder::new()
.with_schema(schema)
.build(rx);

let rb_stream = rb_stream.map_err(|err| Status::unknown(err.to_string()));
Ok(Response::new(Box::pin(rb_stream)))
}

async fn do_put(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
Err(Status::unimplemented("Implement do_put"))
}

async fn do_action(
&self,
_request: Request<Action>,
) -> Result<Response<Self::DoActionStream>, Status> {
Err(Status::unimplemented("Implement do_action"))
}

async fn list_actions(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::ListActionsStream>, Status> {
Err(Status::unimplemented("Implement list_actions"))
}

async fn do_exchange(
&self,
_request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, Status> {
Err(Status::unimplemented("Implement do_exchange"))
}
}

pub fn server() -> impl Future<Output = Result<(), Box<dyn std::error::Error + Send>>> + Send {
let mut addr: SocketAddr = CONFIG
.parseable
.address
.parse()
.expect("valid socket address");
addr.set_port(CONFIG.parseable.grpc_port);

let service = FlightServiceImpl {};

let svc = FlightServiceServer::new(service);

let cors = CorsLayer::new()
// allow `GET` and `POST` when accessing the resource
.allow_methods(Any)
.allow_headers(Any)
.allow_origin(Any);
// allow requests from any origin

Server::builder()
.accept_http1(true)
.layer(cors)
.layer(GrpcWebLayer::new())
.add_service(svc)
.serve(addr)
.map_err(|err| Box::new(err) as Box<dyn std::error::Error + Send>)
}

fn extract_stream(body: &serde_json::Value) -> Result<&str, Status> {
body.as_object()
.ok_or(Status::invalid_argument("expected object in request body"))?
.get("stream")
.ok_or(Status::invalid_argument("stream key value is not provided"))?
.as_str()
.ok_or(Status::invalid_argument("stream key value is invalid"))
}

fn extract_session_key(headers: &MetadataMap) -> Result<SessionKey, Status> {
// Extract username and password from the request using basic auth extractor.
let basic = extract_basic_auth(headers).map(|creds| SessionKey::BasicAuth {
username: creds.user_id,
password: creds.password,
});

if let Some(basic) = basic {
return Ok(basic);
}

let session = extract_cookie(headers)
.map(|cookie| ulid::Ulid::from_string(cookie.value()))
.transpose()
.map_err(|_| Status::invalid_argument("Cookie is tampered with or invalid"))?;

if let Some(session) = session {
return Ok(SessionKey::SessionId(session));
}

Err(Status::unauthenticated("No authentication method supplied"))
}

fn extract_basic_auth(header: &MetadataMap) -> Option<Credentials> {
let creds = header
.get("Authorization")
.and_then(|value| value.to_str().ok())
.and_then(|value| Credentials::from_header(value.to_string()).ok());
creds
}

fn extract_cookie(header: &MetadataMap) -> Option<Cookie> {
let cookies = header
.get("Cookies")
.and_then(|value| value.to_str().ok())
.map(Cookie::split_parse)?;

cookies
.flatten()
.find(|cookie| cookie.name() == SESSION_COOKIE_NAME)
}
Loading

0 comments on commit 7f4da55

Please sign in to comment.