diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index afb1fc3bb..e910c7035 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -45,6 +45,7 @@ use crate::utils::arrow::flight::{ append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, }; +use crate::utils::time::TimeRange; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, @@ -143,6 +144,8 @@ impl FlightService for AirServiceImpl { Status::internal("Failed to create logical plan") })?; + let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time) + .map_err(|e| Status::internal(e.to_string()))?; // create a visitor to extract the table name let mut visitor = TableScanVisitor::default(); let _ = raw_logical_plan.visit(&mut visitor); @@ -159,38 +162,40 @@ impl FlightService for AirServiceImpl { .map_err(|err| Status::internal(err.to_string()))?; // map payload to query - let mut query = into_query(&ticket, &session_state) + let mut query = into_query(&ticket, &session_state, time_range) .await .map_err(|_| Status::internal("Failed to parse query"))?; - let event = - if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) { - let sql = format!("select * from {}", &stream_name); - let start_time = ticket.start_time.clone(); - let end_time = ticket.end_time.clone(); - let out_ticket = json!({ - "query": sql, - "startTime": start_time, - "endTime": end_time - }) - .to_string(); - - let ingester_metadatas = get_ingestor_info() - .await - .map_err(|err| Status::failed_precondition(err.to_string()))?; - let mut minute_result: Vec = vec![]; - - for im in ingester_metadatas { - if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await { - minute_result.append(&mut batches); - } + let event = if send_to_ingester( + query.time_range.start.timestamp_millis(), + query.time_range.end.timestamp_millis(), + ) { + let sql = format!("select * from {}", &stream_name); + let start_time = ticket.start_time.clone(); + let end_time = ticket.end_time.clone(); + let out_ticket = json!({ + "query": sql, + "startTime": start_time, + "endTime": end_time + }) + .to_string(); + + let ingester_metadatas = get_ingestor_info() + .await + .map_err(|err| Status::failed_precondition(err.to_string()))?; + let mut minute_result: Vec = vec![]; + + for im in ingester_metadatas { + if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await { + minute_result.append(&mut batches); } - let mr = minute_result.iter().collect::>(); - let event = append_temporary_events(&stream_name, mr).await?; - Some(event) - } else { - None - }; + } + let mr = minute_result.iter().collect::>(); + let event = append_temporary_events(&stream_name, mr).await?; + Some(event) + } else { + None + }; // try authorize match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) { diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index e4ac92b44..27414b9d0 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -47,6 +47,7 @@ use crate::response::QueryResponse; use crate::storage::object_storage::commit_schema_to_storage; use crate::storage::ObjectStorageError; use crate::utils::actix::extract_session_key_from_req; +use crate::utils::time::{TimeParseError, TimeRange}; use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage; @@ -80,13 +81,17 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result { if query.query.is_empty() { return Err(QueryError::EmptyQuery); @@ -231,42 +237,13 @@ pub async fn into_query( return Err(QueryError::EmptyEndTime); } - let (start, end) = parse_human_time(&query.start_time, &query.end_time)?; - - if start.timestamp() > end.timestamp() { - return Err(QueryError::StartTimeAfterEndTime); - } - Ok(crate::query::Query { raw_logical_plan: session_state.create_logical_plan(&query.query).await?, - start, - end, + time_range, filter_tag: query.filter_tags.clone(), }) } -fn parse_human_time( - start_time: &str, - end_time: &str, -) -> Result<(DateTime, DateTime), QueryError> { - let start: DateTime; - let end: DateTime; - - if end_time == "now" { - end = Utc::now(); - start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?; - } else { - start = DateTime::parse_from_rfc3339(start_time) - .map_err(|_| QueryError::StartTimeParse)? - .into(); - end = DateTime::parse_from_rfc3339(end_time) - .map_err(|_| QueryError::EndTimeParse)? - .into(); - }; - - Ok((start, end)) -} - /// unused for now, might need it in the future #[allow(unused)] fn transform_query_for_ingestor(query: &Query) -> Option { @@ -312,16 +289,8 @@ pub enum QueryError { EmptyStartTime, #[error("End time cannot be empty")] EmptyEndTime, - #[error("Could not parse start time correctly")] - StartTimeParse, - #[error("Could not parse end time correctly")] - EndTimeParse, - #[error("While generating times for 'now' failed to parse duration")] - NotValidDuration(#[from] humantime::DurationError), - #[error("Parsed duration out of range")] - OutOfRange(#[from] chrono::OutOfRangeError), - #[error("Start time cannot be greater than the end time")] - StartTimeAfterEndTime, + #[error("Error while parsing provided time range: {0}")] + TimeParse(#[from] TimeParseError), #[error("Unauthorized")] Unauthorized, #[error("Datafusion Error: {0}")] diff --git a/src/query/mod.rs b/src/query/mod.rs index 350e47ea8..0ce363584 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -46,6 +46,7 @@ use crate::event; use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage::{ObjectStorageProvider, StorageDir}; +use crate::utils::time::TimeRange; pub static QUERY_SESSION: Lazy = Lazy::new(|| Query::create_session_context(CONFIG.storage())); @@ -54,8 +55,7 @@ pub static QUERY_SESSION: Lazy = #[derive(Debug)] pub struct Query { pub raw_logical_plan: LogicalPlan, - pub start: DateTime, - pub end: DateTime, + pub time_range: TimeRange, pub filter_tag: Option>, } @@ -164,8 +164,8 @@ impl Query { LogicalPlan::Explain(plan) => { let transformed = transform( plan.plan.as_ref().clone(), - self.start.naive_utc(), - self.end.naive_utc(), + self.time_range.start.naive_utc(), + self.time_range.end.naive_utc(), filters, time_partition, ); @@ -182,8 +182,8 @@ impl Query { x => { transform( x, - self.start.naive_utc(), - self.end.naive_utc(), + self.time_range.start.naive_utc(), + self.time_range.end.naive_utc(), filters, time_partition, ) diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 61692dac3..2db5567bf 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -20,6 +20,7 @@ pub mod actix; pub mod arrow; pub mod header_parsing; pub mod json; +pub mod time; pub mod uid; pub mod update; use crate::handlers::http::rbac::RBACError; diff --git a/src/utils/time.rs b/src/utils/time.rs new file mode 100644 index 000000000..f728b0cca --- /dev/null +++ b/src/utils/time.rs @@ -0,0 +1,157 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use chrono::{DateTime, Utc}; + +#[derive(Debug, thiserror::Error)] +pub enum TimeParseError { + #[error("Parsing humantime")] + HumanTime(#[from] humantime::DurationError), + #[error("Out of Range")] + OutOfRange(#[from] chrono::OutOfRangeError), + #[error("Error parsing time: {0}")] + Chrono(#[from] chrono::ParseError), + #[error("Start time cannot be greater than the end time")] + StartTimeAfterEndTime, +} + +/// Represents a range of time with a start and end point. +#[derive(Debug)] +pub struct TimeRange { + pub start: DateTime, + pub end: DateTime, +} + +impl TimeRange { + /// Parses human-readable time strings into a `TimeRange` object. + /// + /// # Arguments + /// - `start_time`: A string representing the start of the time range. This can either be + /// a human-readable duration (e.g., `"2 hours"`) or an RFC 3339 formatted timestamp. + /// - `end_time`: A string representing the end of the time range. This can either be + /// the keyword `"now"` (to represent the current time) or an RFC 3339 formatted timestamp. + /// + /// # Errors + /// - `TimeParseError::StartTimeAfterEndTime`: Returned when the parsed start time is later than the end time. + /// - Any error that might occur during parsing of durations or RFC 3339 timestamps. + /// + /// # Example + /// ```ignore + /// let range = TimeRange::parse_human_time("2 hours", "now"); + /// let range = TimeRange::parse_human_time("2023-01-01T12:00:00Z", "2023-01-01T15:00:00Z"); + /// ``` + pub fn parse_human_time(start_time: &str, end_time: &str) -> Result { + let start: DateTime; + let end: DateTime; + + if end_time == "now" { + end = Utc::now(); + start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?; + } else { + start = DateTime::parse_from_rfc3339(start_time)?.into(); + end = DateTime::parse_from_rfc3339(end_time)?.into(); + }; + + if start > end { + return Err(TimeParseError::StartTimeAfterEndTime); + } + + Ok(Self { start, end }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{Duration, SecondsFormat, Utc}; + + #[test] + fn valid_rfc3339_timestamps() { + let start_time = "2023-01-01T12:00:00Z"; + let end_time = "2023-01-01T13:00:00Z"; + + let result = TimeRange::parse_human_time(start_time, end_time); + let parsed = result.unwrap(); + + assert_eq!( + parsed.start.to_rfc3339_opts(SecondsFormat::Secs, true), + start_time + ); + assert_eq!( + parsed.end.to_rfc3339_opts(SecondsFormat::Secs, true), + end_time + ); + } + + #[test] + fn end_time_now_with_valid_duration() { + let start_time = "1h"; + let end_time = "now"; + + let result = TimeRange::parse_human_time(start_time, end_time); + let parsed = result.unwrap(); + + assert!(parsed.end <= Utc::now()); + assert_eq!(parsed.end - parsed.start, Duration::hours(1)); + + let start_time = "30 minutes"; + let end_time = "now"; + + let result = TimeRange::parse_human_time(start_time, end_time); + let parsed = result.unwrap(); + + assert!(parsed.end <= Utc::now()); + assert_eq!(parsed.end - parsed.start, Duration::minutes(30)); + } + + #[test] + fn start_time_after_end_time() { + let start_time = "2023-01-01T14:00:00Z"; + let end_time = "2023-01-01T13:00:00Z"; + + let result = TimeRange::parse_human_time(start_time, end_time); + assert!(matches!(result, Err(TimeParseError::StartTimeAfterEndTime))); + } + + #[test] + fn invalid_start_time_format() { + let start_time = "not-a-valid-time"; + let end_time = "2023-01-01T13:00:00Z"; + + let result = TimeRange::parse_human_time(start_time, end_time); + assert!(matches!(result, Err(TimeParseError::Chrono(_)))); + } + + #[test] + fn invalid_end_time_format() { + let start_time = "2023-01-01T12:00:00Z"; + let end_time = "not-a-valid-time"; + + let result = TimeRange::parse_human_time(start_time, end_time); + assert!(matches!(result, Err(TimeParseError::Chrono(_)))); + } + + #[test] + fn invalid_duration_with_end_time_now() { + let start_time = "not-a-duration"; + let end_time = "now"; + + let result = TimeRange::parse_human_time(start_time, end_time); + assert!(matches!(result, Err(TimeParseError::HumanTime(_)))); + } +}