diff --git a/Cargo.lock b/Cargo.lock index 76c62d1f3430..2ba2e9858d75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2153,6 +2153,7 @@ dependencies = [ "datafusion-common", "datafusion-expr", "datatypes", + "futures-util", "serde", "snafu 0.8.4", "sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)", diff --git a/src/common/query/Cargo.toml b/src/common/query/Cargo.toml index 9ba2e11f88fc..9ede9a339bd6 100644 --- a/src/common/query/Cargo.toml +++ b/src/common/query/Cargo.toml @@ -31,4 +31,5 @@ store-api.workspace = true [dev-dependencies] common-base.workspace = true +futures-util.workspace = true tokio.workspace = true diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 77eb2d4e24a3..c393c03b12ef 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -19,8 +19,10 @@ pub mod logical_plan; pub mod prelude; pub mod request; mod signature; +pub mod stream; #[cfg(any(test, feature = "testing"))] pub mod test_util; + use std::fmt::{Debug, Display, Formatter}; use std::sync::Arc; diff --git a/src/common/query/src/stream.rs b/src/common/query/src/stream.rs new file mode 100644 index 000000000000..a1ed81a583ae --- /dev/null +++ b/src/common/query/src/stream.rs @@ -0,0 +1,175 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::{Arc, Mutex}; + +use common_recordbatch::adapter::DfRecordBatchStreamAdapter; +use common_recordbatch::SendableRecordBatchStream; +use datafusion::execution::context::TaskContext; +use datafusion::execution::SendableRecordBatchStream as DfSendableRecordBatchStream; +use datafusion::physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; +use datafusion_common::DataFusionError; +use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datatypes::schema::SchemaRef; + +/// Adapts greptime's [SendableRecordBatchStream] to DataFusion's [ExecutionPlan]. +pub struct StreamScanAdapter { + stream: Mutex>, + schema: SchemaRef, + arrow_schema: ArrowSchemaRef, + properties: PlanProperties, + output_ordering: Option>, +} + +impl Debug for StreamScanAdapter { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StreamScanAdapter") + .field("stream", &"") + .field("schema", &self.schema) + .finish() + } +} + +impl StreamScanAdapter { + pub fn new(stream: SendableRecordBatchStream) -> Self { + let schema = stream.schema(); + let arrow_schema = schema.arrow_schema().clone(); + let properties = PlanProperties::new( + EquivalenceProperties::new(arrow_schema.clone()), + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ); + + Self { + stream: Mutex::new(Some(stream)), + schema, + arrow_schema, + properties, + output_ordering: None, + } + } + + pub fn with_output_ordering(mut self, output_ordering: Option>) -> Self { + self.output_ordering = output_ordering; + self + } +} + +impl DisplayAs for StreamScanAdapter { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "StreamScanAdapter: [], schema: [" + )?; + write!(f, "{:?}", &self.arrow_schema)?; + write!(f, "]") + } +} + +impl ExecutionPlan for StreamScanAdapter { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.arrow_schema.clone() + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + // DataFusion will swap children unconditionally. + // But since this node is leaf node, it's safe to just return self. + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + Ok(self.clone()) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion_common::Result { + let mut stream = self.stream.lock().unwrap(); + let stream = stream + .take() + .ok_or_else(|| DataFusionError::Execution("Stream already exhausted".to_string()))?; + Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream))) + } +} + +#[cfg(test)] +mod test { + use common_recordbatch::{RecordBatch, RecordBatches}; + use datafusion::prelude::SessionContext; + use datatypes::data_type::ConcreteDataType; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::Int32Vector; + use futures_util::TryStreamExt; + + use super::*; + + #[tokio::test] + async fn test_simple_table_scan() { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ColumnSchema::new( + "a", + ConcreteDataType::int32_datatype(), + false, + )])); + + let batch1 = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice([1, 2])) as _], + ) + .unwrap(); + let batch2 = RecordBatch::new( + schema.clone(), + vec![Arc::new(Int32Vector::from_slice([3, 4, 5])) as _], + ) + .unwrap(); + + let recordbatches = + RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); + let stream = recordbatches.as_stream(); + + let scan = StreamScanAdapter::new(stream); + + assert_eq!(scan.schema(), schema.arrow_schema().clone()); + + let stream = scan.execute(0, ctx.task_ctx()).unwrap(); + let recordbatches = stream.try_collect::>().await.unwrap(); + assert_eq!(recordbatches[0], batch1.into_df_record_batch()); + assert_eq!(recordbatches[1], batch2.into_df_record_batch()); + + let result = scan.execute(0, ctx.task_ctx()); + assert!(result.is_err()); + match result { + Err(e) => assert!(e.to_string().contains("Stream already exhausted")), + _ => unreachable!(), + } + } +} diff --git a/src/file-engine/src/engine.rs b/src/file-engine/src/engine.rs index 22e5f96e7b6d..f8cbbe7ddcfd 100644 --- a/src/file-engine/src/engine.rs +++ b/src/file-engine/src/engine.rs @@ -90,7 +90,8 @@ impl RegionEngine for FileRegionEngine { request: ScanRequest, ) -> Result { let stream = self.handle_query(region_id, request).await?; - let scanner = Box::new(SinglePartitionScanner::new(stream)); + // We don't support enabling append mode for file engine. + let scanner = Box::new(SinglePartitionScanner::new(stream, false)); Ok(scanner) } diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 3cc497b25405..e79a3290bee4 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -69,6 +69,8 @@ pub struct MemtableStats { /// The time range that this memtable contains. It is None if /// and only if the memtable is empty. time_range: Option<(Timestamp, Timestamp)>, + /// Total rows in memtable + num_rows: usize, } impl MemtableStats { @@ -88,6 +90,11 @@ impl MemtableStats { pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> { self.time_range } + + /// Returns the num of total rows in memtable. + pub fn num_rows(&self) -> usize { + self.num_rows + } } pub type BoxedBatchIterator = Box> + Send>; diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs index 0d902aaa8537..e320503886be 100644 --- a/src/mito2/src/memtable/partition_tree.rs +++ b/src/mito2/src/memtable/partition_tree.rs @@ -25,7 +25,7 @@ mod shard_builder; mod tree; use std::fmt; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; use std::sync::Arc; use common_base::readable_size::ReadableSize; @@ -114,6 +114,8 @@ pub struct PartitionTreeMemtable { alloc_tracker: AllocTracker, max_timestamp: AtomicI64, min_timestamp: AtomicI64, + /// Total written rows in memtable. This also includes deleted and duplicated rows. + num_rows: AtomicUsize, } impl fmt::Debug for PartitionTreeMemtable { @@ -139,6 +141,7 @@ impl Memtable for PartitionTreeMemtable { self.update_stats(&metrics); + self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); res } @@ -150,6 +153,7 @@ impl Memtable for PartitionTreeMemtable { self.update_stats(&metrics); + self.num_rows.fetch_add(1, Ordering::Relaxed); res } @@ -202,6 +206,7 @@ impl Memtable for PartitionTreeMemtable { return MemtableStats { estimated_bytes, time_range: None, + num_rows: 0, }; } @@ -219,6 +224,7 @@ impl Memtable for PartitionTreeMemtable { MemtableStats { estimated_bytes, time_range: Some((min_timestamp, max_timestamp)), + num_rows: self.num_rows.load(Ordering::Relaxed), } } @@ -256,6 +262,7 @@ impl PartitionTreeMemtable { alloc_tracker, max_timestamp: AtomicI64::new(i64::MIN), min_timestamp: AtomicI64::new(i64::MAX), + num_rows: AtomicUsize::new(0), } } diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index 4c7a11456620..6d5fbb33a079 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -15,7 +15,7 @@ use std::collections::btree_map::Entry; use std::collections::{BTreeMap, Bound, HashSet}; use std::fmt::{Debug, Formatter}; -use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -101,6 +101,8 @@ pub struct TimeSeriesMemtable { min_timestamp: AtomicI64, dedup: bool, merge_mode: MergeMode, + /// Total written rows in memtable. This also includes deleted and duplicated rows. + num_rows: AtomicUsize, } impl TimeSeriesMemtable { @@ -133,6 +135,7 @@ impl TimeSeriesMemtable { min_timestamp: AtomicI64::new(i64::MAX), dedup, merge_mode, + num_rows: Default::default(), } } @@ -232,6 +235,8 @@ impl Memtable for TimeSeriesMemtable { // We may lift the primary key length check out of Memtable::write // so that we can ensure writing to memtable will succeed. self.update_stats(local_stats); + + self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed); Ok(()) } @@ -241,6 +246,7 @@ impl Memtable for TimeSeriesMemtable { local_stats.allocated += std::mem::size_of::() + std::mem::size_of::(); self.update_stats(local_stats); + self.num_rows.fetch_add(1, Ordering::Relaxed); res } @@ -320,6 +326,7 @@ impl Memtable for TimeSeriesMemtable { return MemtableStats { estimated_bytes, time_range: None, + num_rows: 0, }; } let ts_type = self @@ -335,6 +342,7 @@ impl Memtable for TimeSeriesMemtable { MemtableStats { estimated_bytes, time_range: Some((min_timestamp, max_timestamp)), + num_rows: self.num_rows.load(Ordering::Relaxed), } } diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index a4c1f0c1b943..5e27ebe6df77 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -697,6 +697,12 @@ impl ScanInput { } }); } + + pub(crate) fn total_rows(&self) -> usize { + let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum(); + let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum(); + rows_in_files + rows_in_memtables + } } #[cfg(test)] diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 27dc6cdbdcd1..551d304f2f16 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -65,7 +65,10 @@ impl SeqScan { /// Creates a new [SeqScan]. pub(crate) fn new(input: ScanInput) -> Self { let parallelism = input.parallelism.parallelism.max(1); - let properties = ScannerProperties::new_with_partitions(parallelism); + let properties = ScannerProperties::default() + .with_parallelism(parallelism) + .with_append_mode(input.append_mode) + .with_total_rows(input.total_rows()); let stream_ctx = Arc::new(StreamContext::new(input)); Self { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 1f5eee36d4e0..1d8a0cd8ebd3 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -58,8 +58,11 @@ pub struct UnorderedScan { impl UnorderedScan { /// Creates a new [UnorderedScan]. pub(crate) fn new(input: ScanInput) -> Self { - let properties = - ScannerProperties::new_with_partitions(input.parallelism.parallelism.max(1)); + let parallelism = input.parallelism.parallelism.max(1); + let properties = ScannerProperties::default() + .with_parallelism(parallelism) + .with_append_mode(input.append_mode) + .with_total_rows(input.total_rows()); let stream_ctx = Arc::new(StreamContext::new(input)); Self { @@ -135,7 +138,7 @@ impl RegionScanner for UnorderedScan { } fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { - self.properties = ScannerProperties::new(ranges); + self.properties.partitions = ranges; Ok(()) } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 4852a3f32049..447cd8f5766b 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -220,6 +220,10 @@ impl FileHandle { pub fn size(&self) -> u64 { self.inner.meta.file_size } + + pub fn num_rows(&self) -> usize { + self.inner.meta.num_rows as usize + } } /// Inner data of [FileHandle]. diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index e37f69f44157..ffefc1febdcd 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -157,31 +157,61 @@ pub struct PartitionRange { } /// Properties of the [RegionScanner]. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct ScannerProperties { /// A 2-dim partition ranges. /// /// The first dim vector's length represents the output partition number. The second /// dim is ranges within one partition. pub partitions: Vec>, + + /// Whether scanner is in append-only mode. + append_mode: bool, + + /// Total rows that **may** return by scanner. This field is only read iff + /// [ScannerProperties::append_mode] is true. + total_rows: usize, } impl ScannerProperties { - /// Creates a new [`ScannerProperties`] with the given partitioning. - pub fn new(partitions: Vec>) -> Self { - Self { partitions } + /// Initialize partitions with given parallelism for scanner. + pub fn with_parallelism(mut self, parallelism: usize) -> Self { + self.partitions = vec![vec![]; parallelism]; + self } - /// Creates a new [`ScannerProperties`] with the given number of partitions. - pub fn new_with_partitions(partitions: usize) -> Self { + /// Set append mode for scanner. + pub fn with_append_mode(mut self, append_mode: bool) -> Self { + self.append_mode = append_mode; + self + } + + /// Sets total rows for scanner. + pub fn with_total_rows(mut self, total_rows: usize) -> Self { + self.total_rows = total_rows; + self + } + + /// Creates a new [`ScannerProperties`] with the given partitioning. + pub fn new(partitions: Vec>, append_mode: bool, total_rows: usize) -> Self { Self { - partitions: vec![vec![]; partitions], + partitions, + append_mode, + total_rows, } } pub fn num_partitions(&self) -> usize { self.partitions.len() } + + pub fn append_mode(&self) -> bool { + self.append_mode + } + + pub fn total_rows(&self) -> usize { + self.total_rows + } } /// A scanner that provides a way to scan the region concurrently. @@ -297,12 +327,14 @@ pub struct SinglePartitionScanner { impl SinglePartitionScanner { /// Creates a new [SinglePartitionScanner] with the given stream. - pub fn new(stream: SendableRecordBatchStream) -> Self { + pub fn new(stream: SendableRecordBatchStream, append_mode: bool) -> Self { let schema = stream.schema(); Self { stream: Mutex::new(Some(stream)), schema, - properties: ScannerProperties::new_with_partitions(1), + properties: ScannerProperties::default() + .with_parallelism(1) + .with_append_mode(append_mode), } } } @@ -323,7 +355,7 @@ impl RegionScanner for SinglePartitionScanner { } fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { - self.properties = ScannerProperties::new(ranges); + self.properties.partitions = ranges; Ok(()) } diff --git a/src/table/src/table/adapter.rs b/src/table/src/table/adapter.rs index 13c10fabe4a2..f191450a757d 100644 --- a/src/table/src/table/adapter.rs +++ b/src/table/src/table/adapter.rs @@ -15,6 +15,7 @@ use std::any::Any; use std::sync::{Arc, Mutex}; +use common_query::stream::StreamScanAdapter; use common_recordbatch::OrderOption; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::datasource::{TableProvider, TableType as DfTableType}; @@ -25,10 +26,8 @@ use datafusion_expr::expr::Expr; use datafusion_expr::TableProviderFilterPushDown as DfTableProviderFilterPushDown; use datafusion_physical_expr::expressions::Column; use datafusion_physical_expr::PhysicalSortExpr; -use store_api::region_engine::SinglePartitionScanner; use store_api::storage::ScanRequest; -use crate::table::scan::RegionScanExec; use crate::table::{TableRef, TableType}; /// Adapt greptime's [TableRef] to DataFusion's [TableProvider]. @@ -114,12 +113,9 @@ impl TableProvider for DfTableProviderAdapter { .collect::>() }); - let scanner = Box::new(SinglePartitionScanner::new(stream)); - let mut plan = RegionScanExec::new(scanner); - if let Some(sort_expr) = sort_expr { - plan = plan.with_output_ordering(sort_expr); - } - Ok(Arc::new(plan)) + Ok(Arc::new( + StreamScanAdapter::new(stream).with_output_ordering(sort_expr), + )) } fn supports_filters_pushdown( diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 02c1147875aa..c7d4fe98a283 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -29,7 +29,8 @@ use datafusion::physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, RecordBatchStream as DfRecordBatchStream, }; -use datafusion_common::DataFusionError; +use datafusion_common::stats::Precision; +use datafusion_common::{ColumnStatistics, DataFusionError, Statistics}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr}; use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef; use futures::{Stream, StreamExt}; @@ -46,6 +47,8 @@ pub struct RegionScanExec { output_ordering: Option>, metric: ExecutionPlanMetricsSet, properties: PlanProperties, + append_mode: bool, + total_rows: usize, } impl RegionScanExec { @@ -64,12 +67,16 @@ impl RegionScanExec { Partitioning::UnknownPartitioning(num_output_partition), ExecutionMode::Bounded, ); + let append_mode = scanner_props.append_mode(); + let total_rows = scanner_props.total_rows(); Self { scanner: Mutex::new(scanner), arrow_schema, output_ordering: None, metric: ExecutionPlanMetricsSet::new(), properties, + append_mode, + total_rows, } } @@ -152,6 +159,28 @@ impl ExecutionPlan for RegionScanExec { fn metrics(&self) -> Option { Some(self.metric.clone_inner()) } + + fn statistics(&self) -> DfResult { + let statistics = if self.append_mode { + let column_statistics = self + .arrow_schema + .fields + .iter() + .map(|_| ColumnStatistics { + distinct_count: Precision::Exact(self.total_rows), + ..Default::default() + }) + .collect(); + Statistics { + num_rows: Precision::Exact(self.total_rows), + total_byte_size: Default::default(), + column_statistics, + } + } else { + Statistics::new_unknown(&self.arrow_schema) + }; + Ok(statistics) + } } impl DisplayAs for RegionScanExec { @@ -257,7 +286,7 @@ mod test { RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); let stream = recordbatches.as_stream(); - let scanner = Box::new(SinglePartitionScanner::new(stream)); + let scanner = Box::new(SinglePartitionScanner::new(stream, false)); let plan = RegionScanExec::new(scanner); let actual: SchemaRef = Arc::new( plan.properties diff --git a/tests/cases/distributed/optimizer/order_by.result b/tests/cases/distributed/optimizer/order_by.result index c5c8b09adf00..211dc40f6bf4 100644 --- a/tests/cases/distributed/optimizer/order_by.result +++ b/tests/cases/distributed/optimizer/order_by.result @@ -1,61 +1,61 @@ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers; -+---------------+-----------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SinglePartitionScanner: | -| | | -+---------------+-----------------------------------------------------+ ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number desc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 DESC], preserve_partitioning=[false] | -| | SinglePartitionScanner: | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 DESC], preserve_partitioning=[false] | +| | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number asc; -+---------------+-------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] | -| | SinglePartitionScanner: | -| | | -+---------------+-------------------------------------------------------------------------+ ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] | +| | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number desc limit 10; -+---------------+---------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 DESC], preserve_partitioning=[false] | -| | SinglePartitionScanner: | -| | | -+---------------+---------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 DESC], preserve_partitioning=[false] | +| | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -- SQLNESS REPLACE (peers.*) REDACTED explain select * from numbers order by number asc limit 10; -+---------------+-------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] | -| | SinglePartitionScanner: | -| | | -+---------------+-------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] | +| | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/tests/cases/standalone/optimizer/order_by.result b/tests/cases/standalone/optimizer/order_by.result index b937344b7700..e7fd0c073d1e 100644 --- a/tests/cases/standalone/optimizer/order_by.result +++ b/tests/cases/standalone/optimizer/order_by.result @@ -1,56 +1,56 @@ explain select * from numbers; -+---------------+-----------------------------------------------------+ -| plan_type | plan | -+---------------+-----------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SinglePartitionScanner: | -| | | -+---------------+-----------------------------------------------------+ ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ explain select * from numbers order by number desc; -+---------------+---------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 DESC], preserve_partitioning=[false] | -| | SinglePartitionScanner: | -| | | -+---------------+---------------------------------------------------------------+ ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 DESC], preserve_partitioning=[false] | +| | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ explain select * from numbers order by number asc; -+---------------+-------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] | -| | SinglePartitionScanner: | -| | | -+---------------+-------------------------------------------------------------------------+ ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | SortExec: expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] | +| | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ explain select * from numbers order by number desc limit 10; -+---------------+---------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+---------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 DESC], preserve_partitioning=[false] | -| | SinglePartitionScanner: | -| | | -+---------------+---------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 DESC], preserve_partitioning=[false] | +| | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ explain select * from numbers order by number asc limit 10; -+---------------+-------------------------------------------------------------------------------------------+ -| plan_type | plan | -+---------------+-------------------------------------------------------------------------------------------+ -| logical_plan | MergeScan [is_placeholder=false] | -| physical_plan | GlobalLimitExec: skip=0, fetch=10 | -| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] | -| | SinglePartitionScanner: | -| | | -+---------------+-------------------------------------------------------------------------------------------+ ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| logical_plan | MergeScan [is_placeholder=false] | +| physical_plan | GlobalLimitExec: skip=0, fetch=10 | +| | SortExec: TopK(fetch=10), expr=[number@0 ASC NULLS LAST], preserve_partitioning=[false] | +| | StreamScanAdapter: [], schema: [Schema { fields: [Field { name: "number", data_type: UInt32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {"greptime:version": "0"} }] | +| | | ++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+