Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support evaluate expr in range query param #3823

Merged
merged 4 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ smallvec = { version = "1", features = ["serde"] }
snafu = "0.8"
sysinfo = "0.30"
# on branch v0.44.x
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "c919990bf62ad38d2b0c0a3bc90b26ad919d51b0", features = [
sqlparser = { git = "https://github.com/GreptimeTeam/sqlparser-rs.git", rev = "e4e496b8d62416ad50ce70a1b460c7313610cf5d", features = [
"visitor",
] }
strum = { version = "0.25", features = ["derive"] }
Expand Down
179 changes: 162 additions & 17 deletions src/query/src/range_select/plan_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Duration;
use arrow_schema::DataType;
use async_recursion::async_recursion;
use catalog::table_source::DfTableSourceProvider;
use chrono::Utc;
use common_time::interval::NANOS_PER_MILLI;
use common_time::timestamp::TimeUnit;
use common_time::{Interval, Timestamp, Timezone};
Expand All @@ -27,10 +28,13 @@ use datafusion::prelude::Column;
use datafusion::scalar::ScalarValue;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_common::{DFSchema, DataFusionError, Result as DFResult};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::{
Aggregate, Analyze, Explain, Expr, ExprSchemable, Extension, LogicalPlan, LogicalPlanBuilder,
Projection,
};
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datatypes::prelude::ConcreteDataType;
use promql_parser::util::parse_duration;
use session::context::QueryContextRef;
Expand Down Expand Up @@ -108,34 +112,84 @@ fn parse_expr_to_string(args: &[Expr], i: usize) -> DFResult<String> {
/// Parse a duraion expr:
/// 1. duration string (e.g. `'1h'`)
/// 2. Interval expr (e.g. `INTERVAL '1 year 3 hours 20 minutes'`)
/// 3. An interval expr can be evaluated at the logical plan stage (e.g. `INTERVAL '2' day - INTERVAL '1' day`)
fn parse_duration_expr(args: &[Expr], i: usize) -> DFResult<Duration> {
let interval_to_duration = |interval: Interval| -> Duration {
Duration::from_millis((interval.to_nanosecond() / NANOS_PER_MILLI as i128) as u64)
};
match args.get(i) {
Some(Expr::Literal(ScalarValue::Utf8(Some(str)))) => {
parse_duration(str).map_err(DataFusionError::Plan)
}
Some(Expr::Literal(ScalarValue::IntervalYearMonth(Some(i)))) => {
Ok(interval_to_duration(Interval::from_i32(*i)))
Some(expr) => {
let ms = evaluate_expr_to_millisecond(args, i, true)?;
if ms <= 0 {
return Err(dispose_parse_error(Some(expr)));
}
Ok(Duration::from_millis(ms as u64))
}
None => Err(dispose_parse_error(None)),
}
}

/// Evaluate a time calculation expr, case like:
/// 1. `INTERVAL '1' day + INTERVAL '1 year 2 hours 3 minutes'`
/// 2. `now() - INTERVAL '1' day` (when `interval_only==false`)
///
/// Output a millisecond timestamp
///
/// if `interval_only==true`, only accept expr with all interval type (case 2 will return a error)
fn evaluate_expr_to_millisecond(args: &[Expr], i: usize, interval_only: bool) -> DFResult<i64> {
let Some(expr) = args.get(i) else {
return Err(dispose_parse_error(None));
};
if interval_only && !interval_only_in_expr(expr) {
return Err(dispose_parse_error(Some(expr)));
}
let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
let info = SimplifyContext::new(&execution_props).with_schema(Arc::new(DFSchema::empty()));
let interval_to_ms =
|interval: Interval| -> i64 { (interval.to_nanosecond() / NANOS_PER_MILLI as i128) as i64 };
let simplify_expr = ExprSimplifier::new(info).simplify(expr.clone())?;
match simplify_expr {
Expr::Literal(ScalarValue::TimestampNanosecond(ts_nanos, _))
| Expr::Literal(ScalarValue::DurationNanosecond(ts_nanos)) => {
ts_nanos.map(|v| v / 1_000_000)
}
Expr::Literal(ScalarValue::TimestampMicrosecond(ts_micros, _))
| Expr::Literal(ScalarValue::DurationMicrosecond(ts_micros)) => {
ts_micros.map(|v| v / 1_000)
}
Some(Expr::Literal(ScalarValue::IntervalDayTime(Some(i)))) => {
Ok(interval_to_duration(Interval::from_i64(*i)))
Expr::Literal(ScalarValue::TimestampMillisecond(ts_millis, _))
| Expr::Literal(ScalarValue::DurationMillisecond(ts_millis)) => ts_millis,
Expr::Literal(ScalarValue::TimestampSecond(ts_secs, _))
| Expr::Literal(ScalarValue::DurationSecond(ts_secs)) => ts_secs.map(|v| v * 1_000),
Expr::Literal(ScalarValue::IntervalYearMonth(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i32(v)))
}
Some(Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(i)))) => {
Ok(interval_to_duration(Interval::from_i128(*i)))
Expr::Literal(ScalarValue::IntervalDayTime(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i64(v)))
}
other => Err(dispose_parse_error(other)),
Expr::Literal(ScalarValue::IntervalMonthDayNano(interval)) => {
interval.map(|v| interval_to_ms(Interval::from_i128(v)))
}
_ => None,
}
.ok_or_else(|| {
DataFusionError::Plan(format!(
"{} is not a expr can be evaluate and use in range query",
expr.display_name().unwrap_or_default()
))
})
}

/// Parse the `align to` clause and return a UTC timestamp with unit of millisecond,
Taylor-lagrange marked this conversation as resolved.
Show resolved Hide resolved
/// which is used as the basis for dividing time slot during the align operation.
/// 1. NOW: align to current execute time
/// 2. Timestamp string: align to specific timestamp
/// 3. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
/// 3. An expr can be evaluated at the logical plan stage (e.g. `now() - INTERVAL '1' day`)
/// 4. leave empty (as Default Option): align to unix epoch 0 (timezone aware)
fn parse_align_to(args: &[Expr], i: usize, timezone: Option<&Timezone>) -> DFResult<i64> {
let s = parse_str_expr(args, i)?;
let Ok(s) = parse_str_expr(args, i) else {
return evaluate_expr_to_millisecond(args, i, false);
};
let upper = s.to_uppercase();
match upper.as_str() {
"NOW" => return Ok(Timestamp::current_millis().value()),
Expand Down Expand Up @@ -469,6 +523,25 @@ fn have_range_in_exprs(exprs: &[Expr]) -> bool {
})
}

fn interval_only_in_expr(expr: &Expr) -> bool {
let mut all_interval = true;
let _ = expr.apply(&mut |expr| {
if !matches!(
expr,
Expr::Literal(ScalarValue::IntervalDayTime(_))
| Expr::Literal(ScalarValue::IntervalMonthDayNano(_))
| Expr::Literal(ScalarValue::IntervalYearMonth(_))
| Expr::BinaryExpr(_)
) {
all_interval = false;
Ok(TreeNodeRecursion::Stop)
} else {
Ok(TreeNodeRecursion::Continue)
}
});
all_interval
}

#[cfg(test)]
mod test {

Expand All @@ -477,6 +550,7 @@ mod test {
use catalog::memory::MemoryCatalogManager;
use catalog::RegisterTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use datafusion_expr::{BinaryExpr, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use session::context::QueryContext;
Expand Down Expand Up @@ -754,8 +828,42 @@ mod test {
parse_duration_expr(&args, 0).unwrap(),
parse_duration("1y4w").unwrap()
);
// test err
// test index err
assert!(parse_duration_expr(&args, 10).is_err());
// test evaluate expr
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
})];
assert_eq!(
parse_duration_expr(&args, 0).unwrap().as_millis(),
interval_to_ms(Interval::from_year_month(20))
);
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
})];
// test zero interval error
assert!(parse_duration_expr(&args, 0).is_err());
// test must all be interval
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::Time64Microsecond(Some(0)))),
})];
assert!(parse_duration_expr(&args, 0).is_err());
}

#[test]
Expand Down Expand Up @@ -787,19 +895,56 @@ mod test {
let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
"1970-01-01T00:00:00+08:00".into(),
)))];
assert!(parse_align_to(&args, 0, None).unwrap() == -8 * 60 * 60 * 1000);
assert_eq!(parse_align_to(&args, 0, None).unwrap(), -8 * 60 * 60 * 1000);
// timezone
let args = vec![Expr::Literal(ScalarValue::Utf8(Some(
"1970-01-01T00:00:00".into(),
)))];
assert!(
assert_eq!(
parse_align_to(
&args,
0,
Some(&Timezone::from_tz_string("Asia/Shanghai").unwrap())
)
.unwrap()
== -8 * 60 * 60 * 1000
.unwrap(),
-8 * 60 * 60 * 1000
);
// test evaluate expr
let args = vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Plus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
})];
assert_eq!(
parse_align_to(&args, 0, None).unwrap(),
// 20 month
20 * 30 * 24 * 60 * 60 * 1000
);
}

#[test]
fn test_interval_only() {
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::DurationMillisecond(Some(20)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
});
assert!(!interval_only_in_expr(&expr));
let expr = Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
op: Operator::Minus,
right: Box::new(Expr::Literal(ScalarValue::IntervalYearMonth(Some(
Interval::from_year_month(10).to_i32(),
)))),
});
assert!(interval_only_in_expr(&expr));
}
}
4 changes: 2 additions & 2 deletions tests/cases/standalone/common/range/error.result
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ Error: 3000(PlanQuery), DataFusion error: Error during planning: duration must b

SELECT min(val) RANGE '5s' FROM host ALIGN (INTERVAL '0' day);

Error: 2000(InvalidSyntax), Range Query: Can't use 0 as align in Range Query
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("0")` in range select query

SELECT min(val) RANGE (INTERVAL '0' day) FROM host ALIGN '5s';

Error: 2000(InvalidSyntax), Range Query: Invalid Range expr `MIN(host.val) RANGE IntervalMonthDayNano("0")`, Can't use 0 as range in Range Query
Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("0")` in range select query

DROP TABLE host;

Expand Down
24 changes: 24 additions & 0 deletions tests/cases/standalone/common/range/to.result
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ SELECT ts, min(val) RANGE (INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day)
| 2024-01-24T23:00:00 | 3 |
+---------------------+------------------------------------------------------------------+

SELECT ts, min(val) RANGE (INTERVAL '2' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '2' day - INTERVAL '1' day) TO (now() - (now() + INTERVAL '1' hour)) by (1) ORDER BY ts;

+---------------------+-----------------------------------------------------------------------------------------------------------------+
| ts | MIN(host.val) RANGE IntervalMonthDayNano("36893488147419103232") - IntervalMonthDayNano("18446744073709551616") |
+---------------------+-----------------------------------------------------------------------------------------------------------------+
| 2024-01-22T23:00:00 | 0 |
| 2024-01-23T23:00:00 | 1 |
| 2024-01-24T23:00:00 | 3 |
+---------------------+-----------------------------------------------------------------------------------------------------------------+

-- non-positive duration
SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '2' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;

Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("18446744073709551616") - IntervalMonthDayNano("36893488147419103232")` in range select query

SELECT ts, min(val) RANGE (INTERVAL '1' day - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;

Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `IntervalMonthDayNano("18446744073709551616") - IntervalMonthDayNano("18446744073709551616")` in range select query

-- duration not all interval
SELECT ts, min(val) RANGE (now() - INTERVAL '1' day) FROM host ALIGN (INTERVAL '1' day) TO '1900-01-01T00:00:00+01:00' by (1) ORDER BY ts;

Error: 3000(PlanQuery), DataFusion error: Error during planning: Illegal argument `now() - IntervalMonthDayNano("18446744073709551616")` in range select query

--- ALIGN TO with time zone ---
set time_zone='Asia/Shanghai';

Expand Down
Loading
Loading