Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: utils/time parsing #1024

Merged
merged 11 commits into from
Dec 17, 2024
61 changes: 33 additions & 28 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
};
use crate::utils::time::TimeRange;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
Expand Down Expand Up @@ -143,6 +144,8 @@ impl FlightService for AirServiceImpl {
Status::internal("Failed to create logical plan")
})?;

let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
.map_err(|e| Status::internal(e.to_string()))?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
Expand All @@ -159,38 +162,40 @@ impl FlightService for AirServiceImpl {
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let mut query = into_query(&ticket, &session_state)
let mut query = into_query(&ticket, &session_state, time_range)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

let event =
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
let sql = format!("select * from {}", &stream_name);
let start_time = ticket.start_time.clone();
let end_time = ticket.end_time.clone();
let out_ticket = json!({
"query": sql,
"startTime": start_time,
"endTime": end_time
})
.to_string();

let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas {
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
minute_result.append(&mut batches);
}
let event = if send_to_ingester(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following is not my change, it is a side effect of cargo fmt

query.time_range.start.timestamp_millis(),
query.time_range.end.timestamp_millis(),
) {
let sql = format!("select * from {}", &stream_name);
let start_time = ticket.start_time.clone();
let end_time = ticket.end_time.clone();
let out_ticket = json!({
"query": sql,
"startTime": start_time,
"endTime": end_time
})
.to_string();

let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas {
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
minute_result.append(&mut batches);
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
Some(event)
} else {
None
};
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
Some(event)
} else {
None
};

// try authorize
match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) {
Expand Down
51 changes: 10 additions & 41 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};

use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;

Expand Down Expand Up @@ -80,13 +81,17 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.await?
}
};

let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();
update_schema_when_distributed(tables).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;

let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);
Expand Down Expand Up @@ -218,6 +223,7 @@ impl FromRequest for Query {
pub async fn into_query(
query: &Query,
session_state: &SessionState,
time_range: TimeRange,
) -> Result<LogicalQuery, QueryError> {
if query.query.is_empty() {
return Err(QueryError::EmptyQuery);
Expand All @@ -231,42 +237,13 @@ pub async fn into_query(
return Err(QueryError::EmptyEndTime);
}

let (start, end) = parse_human_time(&query.start_time, &query.end_time)?;

if start.timestamp() > end.timestamp() {
return Err(QueryError::StartTimeAfterEndTime);
}

Ok(crate::query::Query {
raw_logical_plan: session_state.create_logical_plan(&query.query).await?,
start,
end,
time_range,
filter_tag: query.filter_tags.clone(),
})
}

fn parse_human_time(
start_time: &str,
end_time: &str,
) -> Result<(DateTime<Utc>, DateTime<Utc>), QueryError> {
let start: DateTime<Utc>;
let end: DateTime<Utc>;

if end_time == "now" {
end = Utc::now();
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
} else {
start = DateTime::parse_from_rfc3339(start_time)
.map_err(|_| QueryError::StartTimeParse)?
.into();
end = DateTime::parse_from_rfc3339(end_time)
.map_err(|_| QueryError::EndTimeParse)?
.into();
};

Ok((start, end))
}

/// unused for now, might need it in the future
#[allow(unused)]
fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
Expand Down Expand Up @@ -312,16 +289,8 @@ pub enum QueryError {
EmptyStartTime,
#[error("End time cannot be empty")]
EmptyEndTime,
#[error("Could not parse start time correctly")]
StartTimeParse,
#[error("Could not parse end time correctly")]
EndTimeParse,
#[error("While generating times for 'now' failed to parse duration")]
NotValidDuration(#[from] humantime::DurationError),
#[error("Parsed duration out of range")]
OutOfRange(#[from] chrono::OutOfRangeError),
#[error("Start time cannot be greater than the end time")]
StartTimeAfterEndTime,
#[error("Error while parsing provided time range: {0}")]
TimeParse(#[from] TimeParseError),
#[error("Unauthorized")]
Unauthorized,
#[error("Datafusion Error: {0}")]
Expand Down
12 changes: 6 additions & 6 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::event;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::{ObjectStorageProvider, StorageDir};
use crate::utils::time::TimeRange;

pub static QUERY_SESSION: Lazy<SessionContext> =
Lazy::new(|| Query::create_session_context(CONFIG.storage()));
Expand All @@ -54,8 +55,7 @@ pub static QUERY_SESSION: Lazy<SessionContext> =
#[derive(Debug)]
pub struct Query {
pub raw_logical_plan: LogicalPlan,
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
pub time_range: TimeRange,
pub filter_tag: Option<Vec<String>>,
}

Expand Down Expand Up @@ -164,8 +164,8 @@ impl Query {
LogicalPlan::Explain(plan) => {
let transformed = transform(
plan.plan.as_ref().clone(),
self.start.naive_utc(),
self.end.naive_utc(),
self.time_range.start.naive_utc(),
self.time_range.end.naive_utc(),
filters,
time_partition,
);
Expand All @@ -182,8 +182,8 @@ impl Query {
x => {
transform(
x,
self.start.naive_utc(),
self.end.naive_utc(),
self.time_range.start.naive_utc(),
self.time_range.end.naive_utc(),
filters,
time_partition,
)
Expand Down
1 change: 1 addition & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod actix;
pub mod arrow;
pub mod header_parsing;
pub mod json;
pub mod time;
pub mod uid;
pub mod update;
use crate::handlers::http::rbac::RBACError;
Expand Down
157 changes: 157 additions & 0 deletions src/utils/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use chrono::{DateTime, Utc};

#[derive(Debug, thiserror::Error)]
pub enum TimeParseError {
#[error("Parsing humantime")]
HumanTime(#[from] humantime::DurationError),
#[error("Out of Range")]
OutOfRange(#[from] chrono::OutOfRangeError),
#[error("Error parsing time: {0}")]
Chrono(#[from] chrono::ParseError),
#[error("Start time cannot be greater than the end time")]
StartTimeAfterEndTime,
}

/// Represents a range of time with a start and end point.
#[derive(Debug)]
pub struct TimeRange {
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
}

impl TimeRange {
/// Parses human-readable time strings into a `TimeRange` object.
///
/// # Arguments
/// - `start_time`: A string representing the start of the time range. This can either be
/// a human-readable duration (e.g., `"2 hours"`) or an RFC 3339 formatted timestamp.
/// - `end_time`: A string representing the end of the time range. This can either be
/// the keyword `"now"` (to represent the current time) or an RFC 3339 formatted timestamp.
///
/// # Errors
/// - `TimeParseError::StartTimeAfterEndTime`: Returned when the parsed start time is later than the end time.
/// - Any error that might occur during parsing of durations or RFC 3339 timestamps.
///
/// # Example
/// ```ignore
/// let range = TimeRange::parse_human_time("2 hours", "now");
/// let range = TimeRange::parse_human_time("2023-01-01T12:00:00Z", "2023-01-01T15:00:00Z");
/// ```
pub fn parse_human_time(start_time: &str, end_time: &str) -> Result<Self, TimeParseError> {
let start: DateTime<Utc>;
let end: DateTime<Utc>;

if end_time == "now" {
end = Utc::now();
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
} else {
start = DateTime::parse_from_rfc3339(start_time)?.into();
end = DateTime::parse_from_rfc3339(end_time)?.into();
};

if start > end {
return Err(TimeParseError::StartTimeAfterEndTime);
}

Ok(Self { start, end })
}
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, SecondsFormat, Utc};

#[test]
fn valid_rfc3339_timestamps() {
let start_time = "2023-01-01T12:00:00Z";
let end_time = "2023-01-01T13:00:00Z";

let result = TimeRange::parse_human_time(start_time, end_time);
let parsed = result.unwrap();

assert_eq!(
parsed.start.to_rfc3339_opts(SecondsFormat::Secs, true),
start_time
);
assert_eq!(
parsed.end.to_rfc3339_opts(SecondsFormat::Secs, true),
end_time
);
}

#[test]
fn end_time_now_with_valid_duration() {
let start_time = "1h";
let end_time = "now";

let result = TimeRange::parse_human_time(start_time, end_time);
let parsed = result.unwrap();

assert!(parsed.end <= Utc::now());
assert_eq!(parsed.end - parsed.start, Duration::hours(1));

let start_time = "30 minutes";
let end_time = "now";

let result = TimeRange::parse_human_time(start_time, end_time);
let parsed = result.unwrap();

assert!(parsed.end <= Utc::now());
assert_eq!(parsed.end - parsed.start, Duration::minutes(30));
}

#[test]
fn start_time_after_end_time() {
let start_time = "2023-01-01T14:00:00Z";
let end_time = "2023-01-01T13:00:00Z";

let result = TimeRange::parse_human_time(start_time, end_time);
assert!(matches!(result, Err(TimeParseError::StartTimeAfterEndTime)));
}

#[test]
fn invalid_start_time_format() {
let start_time = "not-a-valid-time";
let end_time = "2023-01-01T13:00:00Z";

let result = TimeRange::parse_human_time(start_time, end_time);
assert!(matches!(result, Err(TimeParseError::Chrono(_))));
}

#[test]
fn invalid_end_time_format() {
let start_time = "2023-01-01T12:00:00Z";
let end_time = "not-a-valid-time";

let result = TimeRange::parse_human_time(start_time, end_time);
assert!(matches!(result, Err(TimeParseError::Chrono(_))));
}

#[test]
fn invalid_duration_with_end_time_now() {
let start_time = "not-a-duration";
let end_time = "now";

let result = TimeRange::parse_human_time(start_time, end_time);
assert!(matches!(result, Err(TimeParseError::HumanTime(_))));
}
}
Loading