Skip to content

Commit

Permalink
feat: simple limit impl in PartSort (#4922)
Browse files Browse the repository at this point in the history
* feat: simple limit impl in PartSort

Signed-off-by: Ruihang Xia <[email protected]>

* fix: update time_index method to return a non-optional String

Co-authored-by: Yingwen <[email protected]>
Signed-off-by: Ruihang Xia <[email protected]>

* use builtin limit

Signed-off-by: Ruihang Xia <[email protected]>

* add more info to analyze display

Signed-off-by: Ruihang Xia <[email protected]>

* update sqlness

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
waynexia and evenyag authored Nov 1, 2024
1 parent 1ff29d8 commit be72d3b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 15 deletions.
3 changes: 2 additions & 1 deletion src/query/src/optimizer/windowed_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl WindowedSortPhysicalRule {
} else {
Arc::new(PartSortExec::new(
first_sort_expr.clone(),
sort_exec.fetch(),
scanner_info.partition_ranges.clone(),
sort_exec.input().clone(),
))
Expand Down Expand Up @@ -149,7 +150,7 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti

if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
partition_ranges = Some(region_scan_exec.get_uncollapsed_partition_ranges());
time_index = region_scan_exec.time_index();
time_index = Some(region_scan_exec.time_index());
tag_columns = Some(region_scan_exec.tag_columns());

// set distinguish_partition_ranges to true, this is an incorrect workaround
Expand Down
22 changes: 20 additions & 2 deletions src/query/src/part_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::downcast_ts_array;
pub struct PartSortExec {
/// Physical sort expressions(that is, sort by timestamp)
expression: PhysicalSortExpr,
limit: Option<usize>,
input: Arc<dyn ExecutionPlan>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
Expand All @@ -57,6 +58,7 @@ pub struct PartSortExec {
impl PartSortExec {
pub fn new(
expression: PhysicalSortExpr,
limit: Option<usize>,
partition_ranges: Vec<Vec<PartitionRange>>,
input: Arc<dyn ExecutionPlan>,
) -> Self {
Expand All @@ -69,6 +71,7 @@ impl PartSortExec {

Self {
expression,
limit,
input,
metrics,
partition_ranges,
Expand All @@ -95,6 +98,7 @@ impl PartSortExec {
let df_stream = Box::pin(PartSortStream::new(
context,
self,
self.limit,
input_stream,
self.partition_ranges[partition].clone(),
partition,
Expand All @@ -106,7 +110,16 @@ impl PartSortExec {

impl DisplayAs for PartSortExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "PartSortExec {}", self.expression)
write!(
f,
"PartSortExec: expr={} num_ranges={}",
self.expression,
self.partition_ranges.len(),
)?;
if let Some(limit) = self.limit {
write!(f, " limit={}", limit)?;
}
Ok(())
}
}

Expand Down Expand Up @@ -138,6 +151,7 @@ impl ExecutionPlan for PartSortExec {
};
Ok(Arc::new(Self::new(
self.expression.clone(),
self.limit,
self.partition_ranges.clone(),
new_input.clone(),
)))
Expand Down Expand Up @@ -170,6 +184,7 @@ struct PartSortStream {
reservation: MemoryReservation,
buffer: Vec<DfRecordBatch>,
expression: PhysicalSortExpr,
limit: Option<usize>,
produced: usize,
input: DfSendableRecordBatchStream,
input_complete: bool,
Expand All @@ -185,6 +200,7 @@ impl PartSortStream {
fn new(
context: Arc<TaskContext>,
sort: &PartSortExec,
limit: Option<usize>,
input: DfSendableRecordBatchStream,
partition_ranges: Vec<PartitionRange>,
partition: usize,
Expand All @@ -194,6 +210,7 @@ impl PartSortStream {
.register(&context.runtime_env().memory_pool),
buffer: Vec::new(),
expression: sort.expression.clone(),
limit,
produced: 0,
input,
input_complete: false,
Expand Down Expand Up @@ -294,7 +311,7 @@ impl PartSortStream {
)
})?;

let indices = sort_to_indices(&sort_column, opt, None).map_err(|e| {
let indices = sort_to_indices(&sort_column, opt, self.limit).map_err(|e| {
DataFusionError::ArrowError(
e,
Some(format!("Fail to sort to indices at {}", location!())),
Expand Down Expand Up @@ -674,6 +691,7 @@ mod test {
expr: Arc::new(Column::new("ts", 0)),
options: opt,
},
None,
vec![ranges],
Arc::new(mock_input),
);
Expand Down
11 changes: 10 additions & 1 deletion src/query/src/window_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,16 @@ impl WindowedSortExec {

impl DisplayAs for WindowedSortExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WindowedSortExec")
write!(
f,
"WindowedSortExec: expr={} num_ranges={}",
self.expression,
self.ranges.len()
)?;
if let Some(fetch) = self.fetch {
write!(f, " fetch={}", fetch)?;
}
Ok(())
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ impl RegionScanExec {
let _ = scanner.prepare(partition_ranges, distinguish_partition_range);
}

pub fn time_index(&self) -> Option<String> {
pub fn time_index(&self) -> String {
self.scanner
.lock()
.unwrap()
.schema()
.timestamp_column()
.map(|x| x.name.clone())
.metadata()
.time_index_column()
.column_schema
.name
.clone()
}

pub fn tag_columns(&self) -> Vec<String> {
Expand Down
14 changes: 7 additions & 7 deletions tests/cases/standalone/common/order/windowed_sort.result
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@1 ASC NULLS LAST] REDACTED
|_|_|_WindowedSortExec REDACTED
|_|_|_WindowedSortExec: expr=t@1 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
Expand Down Expand Up @@ -101,8 +101,8 @@ EXPLAIN ANALYZE SELECT * FROM test ORDER BY t DESC LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@1 DESC] REDACTED
|_|_|_WindowedSortExec REDACTED
|_|_|_PartSortExec t@1 DESC REDACTED
|_|_|_WindowedSortExec: expr=t@1 DESC num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@1 DESC num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
Expand Down Expand Up @@ -183,8 +183,8 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@2 ASC NULLS LAST] REDACTED
|_|_|_WindowedSortExec REDACTED
|_|_|_PartSortExec t@2 ASC NULLS LAST REDACTED
|_|_|_WindowedSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 ASC NULLS LAST num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
Expand Down Expand Up @@ -216,8 +216,8 @@ EXPLAIN ANALYZE SELECT * FROM test_pk ORDER BY t DESC LIMIT 5;
|_|_|_|
| 1_| 0_|_GlobalLimitExec: skip=0, fetch=5 REDACTED
|_|_|_SortPreservingMergeExec: [t@2 DESC] REDACTED
|_|_|_WindowedSortExec REDACTED
|_|_|_PartSortExec t@2 DESC REDACTED
|_|_|_WindowedSortExec: expr=t@2 DESC num_ranges=2 fetch=5 REDACTED
|_|_|_PartSortExec: expr=t@2 DESC num_ranges=2 limit=5 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=2 (1 memtable ranges, 1 file 1 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 5_|
Expand Down

0 comments on commit be72d3b

Please sign in to comment.