Skip to content

Commit

Permalink
impl flight protocol with staging data
Browse files Browse the repository at this point in the history
  • Loading branch information
Eshanatnight committed Apr 29, 2024
1 parent 02ab395 commit 99a3c5e
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 31 deletions.
16 changes: 14 additions & 2 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct Event {

// Events holds the schema related to a each event for a single log stream
impl Event {
pub async fn process(self) -> Result<(), EventError> {
pub async fn process(&self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);
let num_rows = self.rb.num_rows() as u64;

Expand All @@ -65,7 +65,7 @@ impl Event {
crate::livetail::LIVETAIL.process(&self.stream_name, &self.rb);

if let Err(e) = metadata::STREAM_INFO
.check_alerts(&self.stream_name, self.rb)
.check_alerts(&self.stream_name, &self.rb)
.await
{
log::error!("Error checking for alerts. {:?}", e);
Expand All @@ -74,6 +74,18 @@ impl Event {
Ok(())
}

pub fn process_unchecked(&self) -> Result<(), EventError> {
let key = get_schema_key(&self.rb.schema().fields);

Self::process_event(&self.stream_name, &key, self.rb.clone())?;

Ok(())
}

pub fn clear(&self, stream_name: &str) {
STREAM_WRITERS.clear(stream_name);
}

// event process all events after the 1st event. Concatenates record batches
// and puts them in memory store for each event.
fn process_event(
Expand Down
50 changes: 40 additions & 10 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use std::{
sync::{Arc, Mutex, RwLock},
};

use crate::utils;
use crate::{
option::{Mode, CONFIG},
utils,
};

use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter};
use arrow_array::{RecordBatch, TimestampMillisecondArray};
Expand Down Expand Up @@ -60,6 +63,11 @@ impl Writer {
self.mem.push(schema_key, rb);
Ok(())
}

fn push_mem(&mut self, schema_key: &str, rb: RecordBatch) -> Result<(), StreamWriterError> {
self.mem.push(schema_key, rb);
Ok(())
}
}

#[derive(Deref, DerefMut, Default)]
Expand All @@ -77,31 +85,53 @@ impl WriterTable {

match hashmap_guard.get(stream_name) {
Some(stream_writer) => {
stream_writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
if CONFIG.parseable.mode != Mode::Query {
stream_writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
} else {
stream_writer
.lock()
.unwrap()
.push_mem(stream_name, record)?;
}
}
None => {
drop(hashmap_guard);
let mut map = self.write().unwrap();
// check for race condition
// if map contains entry then just
if let Some(writer) = map.get(stream_name) {
writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
} else {
if CONFIG.parseable.mode != Mode::Query {
writer
.lock()
.unwrap()
.push(stream_name, schema_key, record)?;
} else {
writer.lock().unwrap().push_mem(stream_name, record)?;
}
} else if CONFIG.parseable.mode != Mode::Query {
let mut writer = Writer::default();
writer.push(stream_name, schema_key, record)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
} else {
let mut writer = Writer::default();
writer.push_mem(stream_name, record)?;
map.insert(stream_name.to_owned(), Mutex::new(writer));
}
}
};
Ok(())
}

pub fn clear(&self, stream_name: &str) {
let map = self.write().unwrap();
if let Some(writer) = map.get(stream_name) {
writer.lock().unwrap().mem.clear();
}
}

pub fn delete_stream(&self, stream_name: &str) {
self.write().unwrap().remove(stream_name);
}
Expand Down
8 changes: 8 additions & 0 deletions server/src/event/writer/mem_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ impl<const N: usize> MemWriter<N> {
}
}

pub fn clear(&mut self) {
self.schema = Schema::empty();
self.schema_map.clear();
self.read_buffer.clear();
self.mutable_buffer.inner.clear();
self.mutable_buffer.rows = 0;
}

pub fn recordbatch_cloned(&self, schema: &Arc<Schema>) -> Vec<RecordBatch> {
let mut read_buffer = self.read_buffer.clone();
if self.mutable_buffer.rows > 0 {
Expand Down
43 changes: 27 additions & 16 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tonic_web::GrpcWebLayer;
use crate::event::commit_schema;
use crate::handlers::http::cluster::get_ingestor_info;
use crate::handlers::http::fetch_schema;
use crate::handlers::http::ingest::push_logs_unchecked;
use crate::option::{Mode, CONFIG};

use crate::handlers::livetail::cross_origin_config;
Expand Down Expand Up @@ -148,11 +149,18 @@ impl FlightService for AirServiceImpl {
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

// if table name is not present it is a Malformed Query
let stream_name = query
.first_table_name()
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;

let time_delta = query.end - Utc::now();

let minute_result = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2
{
let sql = format!("{}\"query\": \"{}\"{}", L_CURLY, &ticket.query, R_CURLY);
let events = if CONFIG.parseable.mode == Mode::Query && time_delta.num_seconds() < 2 {
let sql = format!(
"{}\"query\": \"select * from {}\"{}",
L_CURLY, &stream_name, R_CURLY
);
let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
Expand All @@ -162,33 +170,31 @@ impl FlightService for AirServiceImpl {
let mut batches = run_do_get_rpc(im, sql.clone()).await?;
minute_result.append(&mut batches);
}

Some(minute_result)
let mut events = vec![];
for batch in minute_result {
events.push(
push_logs_unchecked(batch, &stream_name)
.await
.map_err(|err| Status::internal(err.to_string()))?,
);
}
Some(events)
} else {
None
};

// if table name is not present it is a Malformed Query
let stream_name = query
.first_table_name()
.ok_or_else(|| Status::invalid_argument("Malformed Query"))?;

let permissions = Users.get_permissions(&key);

authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;

let (mut results, _) = query
.execute(stream_name)
let (results, _) = query
.execute(stream_name.clone())
.await
.map_err(|err| Status::internal(err.to_string()))
.unwrap();

if let Some(mut minute_result) = minute_result {
results.append(&mut minute_result);
};

let schemas = results
.iter()
.map(|batch| batch.schema())
Expand All @@ -210,6 +216,11 @@ impl FlightService for AirServiceImpl {
flights.push(flight_batch.into());
}
let output = futures::stream::iter(flights.into_iter().map(Ok));
if let Some(events) = events {
for event in events {
event.clear(&stream_name);
}
}
Ok(Response::new(Box::pin(output) as Self::DoGetStream))
}

Expand Down
18 changes: 18 additions & 0 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ use crate::option::{Mode, CONFIG};
use crate::storage::{LogStream, ObjectStorageError};
use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError};
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
use arrow_array::RecordBatch;
use arrow_schema::{Field, Schema};
use bytes::Bytes;
use http::StatusCode;
use serde_json::Value;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

// Handler for POST /api/v1/ingest
// ingests events by extracting stream name from header
// creates if stream does not exist
Expand Down Expand Up @@ -94,6 +96,22 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
Ok(HttpResponse::Ok().finish())
}

pub async fn push_logs_unchecked(
batches: RecordBatch,
stream_name: &str,
) -> Result<event::Event, PostError> {
let event = event::Event {
rb: batches,
stream_name: stream_name.to_string(),
origin_format: "json",
origin_size: 0,
is_first_event: true,
};
event.process_unchecked()?;

Ok(event)
}

async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result<(), PostError> {
let (size, rb, is_first_event) = {
let hash_map = STREAM_INFO.read().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl StreamInfo {
pub async fn check_alerts(
&self,
stream_name: &str,
rb: RecordBatch,
rb: &RecordBatch,
) -> Result<(), CheckAlertError> {
let map = self.read().expect(LOCK_EXPECT);
let meta = map
Expand Down
2 changes: 0 additions & 2 deletions server/src/utils/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ use serde_json::{Map, Value};
/// }
///


/// Replaces columns in a record batch with new arrays.
///
/// # Arguments
Expand Down

0 comments on commit 99a3c5e

Please sign in to comment.