From 6cc894f512d64b50f14d59a20aaef4cf64e693bf Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 6 Dec 2024 16:46:09 +0530 Subject: [PATCH 1/5] refactor: utils/time parsing --- query_cache/.cache_meta.json | 1 + src/handlers/airplane.rs | 68 +++++++++++++++++++----------------- src/handlers/http/query.rs | 63 ++++++++------------------------- src/query/mod.rs | 12 +++---- src/utils/mod.rs | 1 + src/utils/time.rs | 58 ++++++++++++++++++++++++++++++ 6 files changed, 116 insertions(+), 87 deletions(-) create mode 100644 query_cache/.cache_meta.json create mode 100644 src/utils/time.rs diff --git a/query_cache/.cache_meta.json b/query_cache/.cache_meta.json new file mode 100644 index 000000000..04c9ab149 --- /dev/null +++ b/query_cache/.cache_meta.json @@ -0,0 +1 @@ +{"version":"v1","size_capacity":1073741824} \ No newline at end of file diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index c803ba194..350de26d4 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -49,6 +49,7 @@ use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, }; +use crate::utils::time::TimeRange; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, @@ -149,6 +150,8 @@ impl FlightService for AirServiceImpl { Status::internal("Failed to create logical plan") })?; + let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time) + .map_err(|e| Status::internal(e.to_string()))?; // create a visitor to extract the table name let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); @@ -184,8 +187,7 @@ impl FlightService for AirServiceImpl { query_cache_manager, &stream_name, user_id, - &ticket.start_time, - &ticket.end_time, + &time_range, &ticket.query, ticket.send_null, ticket.fields, @@ -200,38 +202,40 @@ impl FlightService for AirServiceImpl { .map_err(|err| Status::internal(err.to_string()))?; // map payload to query - let mut query = into_query(&ticket, &session_state) + let mut query = into_query(&ticket, &session_state, time_range) .await .map_err(|_| Status::internal("Failed to parse query"))?; - let event = - if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) { - let sql = format!("select * from {}", &stream_name); - let start_time = ticket.start_time.clone(); - let end_time = ticket.end_time.clone(); - let out_ticket = json!({ - "query": sql, - "startTime": start_time, - "endTime": end_time - }) - .to_string(); - - let ingester_metadatas = get_ingestor_info() - .await - .map_err(|err| Status::failed_precondition(err.to_string()))?; - let mut minute_result: Vec = vec![]; - - for im in ingester_metadatas { - if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await { - minute_result.append(&mut batches); - } + let event = if send_to_ingester( + query.time_range.start.timestamp_millis(), + query.time_range.end.timestamp_millis(), + ) { + let sql = format!("select * from {}", &stream_name); + let start_time = ticket.start_time.clone(); + let end_time = ticket.end_time.clone(); + let out_ticket = json!({ + "query": sql, + "startTime": start_time, + "endTime": end_time + }) + .to_string(); + + let ingester_metadatas = get_ingestor_info() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let mut minute_result: Vec = vec![]; + + for im in ingester_metadatas { + if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await { + minute_result.append(&mut batches); } - let mr = minute_result.iter().collect::>(); - let event = append_temporary_events(&stream_name, mr).await?; - Some(event) - } else { - None - }; + } + let mr = minute_result.iter().collect::>(); + let event = append_temporary_events(&stream_name, mr).await?; + Some(event) + } else { + None + }; // try authorize match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) { @@ -263,8 +267,8 @@ impl FlightService for AirServiceImpl { query_cache_manager, &stream_name, &records, - query.start.to_rfc3339(), - query.end.to_rfc3339(), + query.time_range.start.to_rfc3339(), + query.time_range.end.to_rfc3339(), ticket.query, ) .await diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index f99b170f2..85d432a8a 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -34,6 +34,7 @@ use std::time::Instant; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; use crate::metadata::STREAM_INFO; +use crate::utils::time::{TimeParseError, TimeRange}; use arrow_array::RecordBatch; use crate::event::commit_schema; @@ -84,6 +85,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result Result, stream: &str, user_id: Option<&str>, - start_time: &str, - end_time: &str, + TimeRange { start, end }: &TimeRange, query: &str, send_null: bool, send_fields: bool, @@ -295,8 +298,6 @@ pub async fn get_results_from_cache( let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?; - let (start, end) = parse_human_time(start_time, end_time)?; - let file_path = query_cache.get_file(&CacheMetadata::new( query.to_string(), start.to_rfc3339(), @@ -391,6 +392,7 @@ impl FromRequest for Query { pub async fn into_query( query: &Query, session_state: &SessionState, + time_range: TimeRange, ) -> Result { if query.query.is_empty() { return Err(QueryError::EmptyQuery); @@ -404,42 +406,13 @@ pub async fn into_query( return Err(QueryError::EmptyEndTime); } - let (start, end) = parse_human_time(&query.start_time, &query.end_time)?; - - if start.timestamp() > end.timestamp() { - return Err(QueryError::StartTimeAfterEndTime); - } - Ok(crate::query::Query { raw_logical_plan: session_state.create_logical_plan(&query.query).await?, - start, - end, + time_range, filter_tag: query.filter_tags.clone(), }) } -fn parse_human_time( - start_time: &str, - end_time: &str, -) -> Result<(DateTime, DateTime), QueryError> { - let start: DateTime; - let end: DateTime; - - if end_time == "now" { - end = Utc::now(); - start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?; - } else { - start = DateTime::parse_from_rfc3339(start_time) - .map_err(|_| QueryError::StartTimeParse)? - .into(); - end = DateTime::parse_from_rfc3339(end_time) - .map_err(|_| QueryError::EndTimeParse)? - .into(); - }; - - Ok((start, end)) -} - /// unused for now, might need it in the future #[allow(unused)] fn transform_query_for_ingestor(query: &Query) -> Option { @@ -485,16 +458,8 @@ pub enum QueryError { EmptyStartTime, #[error("End time cannot be empty")] EmptyEndTime, - #[error("Could not parse start time correctly")] - StartTimeParse, - #[error("Could not parse end time correctly")] - EndTimeParse, - #[error("While generating times for 'now' failed to parse duration")] - NotValidDuration(#[from] humantime::DurationError), - #[error("Parsed duration out of range")] - OutOfRange(#[from] chrono::OutOfRangeError), - #[error("Start time cannot be greater than the end time")] - StartTimeAfterEndTime, + #[error("Error while parsing provided time range: {0}")] + TimeParse(#[from] TimeParseError), #[error("Unauthorized")] Unauthorized, #[error("Datafusion Error: {0}")] diff --git a/src/query/mod.rs b/src/query/mod.rs index b41a066f8..a3f8a0330 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -46,6 +46,7 @@ use crate::event; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage::{ObjectStorageProvider, StorageDir}; +use crate::utils::time::TimeRange; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); @@ -54,8 +55,7 @@ pub static QUERY_SESSION: Lazy = #[derive(Debug)] pub struct Query { pub raw_logical_plan: LogicalPlan, - pub start: DateTime, - pub end: DateTime, + pub time_range: TimeRange, pub filter_tag: Option>, } @@ -164,8 +164,8 @@ impl Query { LogicalPlan::Explain(plan) => { let transformed = transform( plan.plan.as_ref().clone(), - self.start.naive_utc(), - self.end.naive_utc(), + self.time_range.start.naive_utc(), + self.time_range.end.naive_utc(), filters, time_partition, ); @@ -182,8 +182,8 @@ impl Query { x => { transform( x, - self.start.naive_utc(), - self.end.naive_utc(), + self.time_range.start.naive_utc(), + self.time_range.end.naive_utc(), filters, time_partition, ) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 1516b2ac1..2c8cbb87d 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -20,6 +20,7 @@ pub mod actix; pub mod arrow; pub mod header_parsing; pub mod json; +pub mod time; pub mod uid; pub mod update; use crate::handlers::http::rbac::RBACError; diff --git a/src/utils/time.rs b/src/utils/time.rs new file mode 100644 index 000000000..332bd0355 --- /dev/null +++ b/src/utils/time.rs @@ -0,0 +1,58 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use chrono::{DateTime, Utc}; + +#[derive(Debug, thiserror::Error)] +pub enum TimeParseError { + #[error("Parsing humantime")] + HumanTime(#[from] humantime::DurationError), + #[error("Out of Range")] + OutOfRange(#[from] chrono::OutOfRangeError), + #[error("Error parsing time: {0}")] + Chrono(#[from] chrono::ParseError), + #[error("Start time cannot be greater than the end time")] + StartTimeAfterEndTime, +} + +#[derive(Debug)] +pub struct TimeRange { + pub start: DateTime, + pub end: DateTime, +} + +impl TimeRange { + pub fn parse_human_time(start_time: &str, end_time: &str) -> Result { + let start: DateTime; + let end: DateTime; + + if end_time == "now" { + end = Utc::now(); + start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?; + } else { + start = DateTime::parse_from_rfc3339(start_time)?.into(); + end = DateTime::parse_from_rfc3339(end_time)?.into(); + }; + + if start > end { + return Err(TimeParseError::StartTimeAfterEndTime); + } + + Ok(Self { start, end }) + } +} From e011daf38a9f5a8ebc415b423ca4bba9afdfa4ef Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 6 Dec 2024 17:00:50 +0530 Subject: [PATCH 2/5] rm unexpected addition --- query_cache/.cache_meta.json | 1 - 1 file changed, 1 deletion(-) delete mode 100644 query_cache/.cache_meta.json diff --git a/query_cache/.cache_meta.json b/query_cache/.cache_meta.json deleted file mode 100644 index 04c9ab149..000000000 --- a/query_cache/.cache_meta.json +++ /dev/null @@ -1 +0,0 @@ -{"version":"v1","size_capacity":1073741824} \ No newline at end of file From e7fdf704de76de3f681ee0187c6b97917d7cbcb5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 8 Dec 2024 03:13:58 +0530 Subject: [PATCH 3/5] doc: code comments --- src/utils/time.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/src/utils/time.rs b/src/utils/time.rs index 332bd0355..f3ac1c8d1 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -30,6 +30,7 @@ pub enum TimeParseError { StartTimeAfterEndTime, } +/// Represents a range of time with a start and end point. #[derive(Debug)] pub struct TimeRange { pub start: DateTime, @@ -37,6 +38,23 @@ pub struct TimeRange { } impl TimeRange { + /// Parses human-readable time strings into a `TimeRange` object. + /// + /// # Arguments + /// - `start_time`: A string representing the start of the time range. This can either be + /// a human-readable duration (e.g., `"2 hours"`) or an RFC 3339 formatted timestamp. + /// - `end_time`: A string representing the end of the time range. This can either be + /// the keyword `"now"` (to represent the current time) or an RFC 3339 formatted timestamp. + /// + /// # Errors + /// - `TimeParseError::StartTimeAfterEndTime`: Returned when the parsed start time is later than the end time. + /// - Any error that might occur during parsing of durations or RFC 3339 timestamps. + /// + /// # Example + /// ``` + /// let range = TimeRange::parse_human_time("2 hours", "now"); + /// let range = TimeRange::parse_human_time("2023-01-01T12:00:00Z", "2023-01-01T15:00:00Z"); + /// ``` pub fn parse_human_time(start_time: &str, end_time: &str) -> Result { let start: DateTime; let end: DateTime; From aae9befa7660d9004259c7e329eb3278e7718153 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 8 Dec 2024 03:39:11 +0530 Subject: [PATCH 4/5] ignore documentary code --- src/utils/time.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/time.rs b/src/utils/time.rs index f3ac1c8d1..ee130843d 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -51,7 +51,7 @@ impl TimeRange { /// - Any error that might occur during parsing of durations or RFC 3339 timestamps. /// /// # Example - /// ``` + /// ```ignore /// let range = TimeRange::parse_human_time("2 hours", "now"); /// let range = TimeRange::parse_human_time("2023-01-01T12:00:00Z", "2023-01-01T15:00:00Z"); /// ``` From ffd705f7d572a1793d371d91c3a93ce7adc92a9a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 9 Dec 2024 02:07:24 +0530 Subject: [PATCH 5/5] test: improve coverage --- src/utils/time.rs | 81 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/src/utils/time.rs b/src/utils/time.rs index ee130843d..f728b0cca 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -74,3 +74,84 @@ impl TimeRange { Ok(Self { start, end }) } } + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{Duration, SecondsFormat, Utc}; + + #[test] + fn valid_rfc3339_timestamps() { + let start_time = "2023-01-01T12:00:00Z"; + let end_time = "2023-01-01T13:00:00Z"; + + let result = TimeRange::parse_human_time(start_time, end_time); + let parsed = result.unwrap(); + + assert_eq!( + parsed.start.to_rfc3339_opts(SecondsFormat::Secs, true), + start_time + ); + assert_eq!( + parsed.end.to_rfc3339_opts(SecondsFormat::Secs, true), + end_time + ); + } + + #[test] + fn end_time_now_with_valid_duration() { + let start_time = "1h"; + let end_time = "now"; + + let result = TimeRange::parse_human_time(start_time, end_time); + let parsed = result.unwrap(); + + assert!(parsed.end <= Utc::now()); + assert_eq!(parsed.end - parsed.start, Duration::hours(1)); + + let start_time = "30 minutes"; + let end_time = "now"; + + let result = TimeRange::parse_human_time(start_time, end_time); + let parsed = result.unwrap(); + + assert!(parsed.end <= Utc::now()); + assert_eq!(parsed.end - parsed.start, Duration::minutes(30)); + } + + #[test] + fn start_time_after_end_time() { + let start_time = "2023-01-01T14:00:00Z"; + let end_time = "2023-01-01T13:00:00Z"; + + let result = TimeRange::parse_human_time(start_time, end_time); + assert!(matches!(result, Err(TimeParseError::StartTimeAfterEndTime))); + } + + #[test] + fn invalid_start_time_format() { + let start_time = "not-a-valid-time"; + let end_time = "2023-01-01T13:00:00Z"; + + let result = TimeRange::parse_human_time(start_time, end_time); + assert!(matches!(result, Err(TimeParseError::Chrono(_)))); + } + + #[test] + fn invalid_end_time_format() { + let start_time = "2023-01-01T12:00:00Z"; + let end_time = "not-a-valid-time"; + + let result = TimeRange::parse_human_time(start_time, end_time); + assert!(matches!(result, Err(TimeParseError::Chrono(_)))); + } + + #[test] + fn invalid_duration_with_end_time_now() { + let start_time = "not-a-duration"; + let end_time = "now"; + + let result = TimeRange::parse_human_time(start_time, end_time); + assert!(matches!(result, Err(TimeParseError::HumanTime(_)))); + } +}