From 65f8b72d348fd3c69dfbd602d5905551f9be602c Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 12 Jun 2024 16:21:30 +0800 Subject: [PATCH] feat: Implement RegionScanner for SeqScan (#4060) * feat: ordered builder wip * feat: impl RegionScanner for SeqScan * feat: implement scan_partition and build_stream * chore: return SeqScan as RegionScanner * fix: group parts * feat: split parts * chore: reader metrics * chore: metrics * chore: remove unused codes * chore: support holding a group of ranges in ScanPart * feat: group ScanParts to ScanParts * feat: impl SeqScanner again * chore: observe build cost in ScannerMetrics * chore: fix compiler warnings * style: fix clippy * docs: update config docs * chore: forward DisplayAs to scanner * test: update sqlness tests * chore: update debug fmt * chore: custom debug for timestamp fix test compiling issue with common-macro when running cargo nextest -p common-time * chore: update debug format * feat: update fmt for scan part * chore: fix warning * fix: sanitize parallelism * feat: split parts * test: fix config api test * feat: update logs * chore: Revert "chore: remove unused codes" This reverts commit b548b30a01eeded59b1a0a8d89f9293ca63afc41. * chore: Revert "docs: update config docs" This reverts commit a7997e78d6ddcf635560574de8c1948c495bdd12. * feat: each partition scan files in parallel * test: fix config api test * docs: fix typo * chore: address comments, simplify tests * feat: global semaphore * feat: always spawn task * chore: simplify default explain output format * handle output partiton number is 0 Signed-off-by: Ruihang Xia * fix typo Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia Co-authored-by: Ruihang Xia --- src/common/time/src/timestamp.rs | 30 +- src/mito2/src/compaction/twcs.rs | 11 +- src/mito2/src/engine/append_mode_test.rs | 2 +- src/mito2/src/engine/basic_test.rs | 2 + src/mito2/src/engine/filter_deleted_test.rs | 2 +- src/mito2/src/memtable.rs | 10 +- src/mito2/src/read.rs | 51 ++ src/mito2/src/read/scan_region.rs | 260 +++++--- src/mito2/src/read/seq_scan.rs | 606 ++++++++++++++---- src/mito2/src/read/unordered_scan.rs | 156 ++--- src/mito2/src/sst/file.rs | 9 + src/mito2/src/sst/parquet/file_range.rs | 6 + src/mito2/src/sst/parquet/reader.rs | 9 +- src/mito2/src/test_util/memtable_util.rs | 19 +- src/table/src/table/scan.rs | 13 +- tests-integration/src/grpc.rs | 2 +- .../cases/distributed/explain/analyze.result | 2 +- .../common/alter/alter_table_default.result | 2 + .../common/alter/alter_table_default.sql | 2 + .../alter/alter_table_first_after.result | 2 + .../common/alter/alter_table_first_after.sql | 2 + .../common/alter/change_col_type.result | 2 + .../common/alter/change_col_type.sql | 2 + .../common/alter/drop_col_not_null.result | 1 + .../common/alter/drop_col_not_null.sql | 1 + .../alter/drop_col_not_null_next.result | 1 + .../common/alter/drop_col_not_null_next.sql | 1 + .../cases/standalone/common/range/nest.result | 2 +- .../common/tql-explain-analyze/analyze.result | 8 +- 29 files changed, 876 insertions(+), 340 deletions(-) diff --git a/src/common/time/src/timestamp.rs b/src/common/time/src/timestamp.rs index a20a1925d040..a4aac7fd54ba 100644 --- a/src/common/time/src/timestamp.rs +++ b/src/common/time/src/timestamp.rs @@ -14,7 +14,7 @@ use core::default::Default; use std::cmp::Ordering; -use std::fmt::{Display, Formatter, Write}; +use std::fmt::{self, Display, Formatter, Write}; use std::hash::{Hash, Hasher}; use std::time::Duration; @@ -41,7 +41,7 @@ use crate::{error, Interval}; /// # Note: /// For values out of range, you can still store these timestamps, but while performing arithmetic /// or formatting operations, it will return an error or just overflow. -#[derive(Debug, Clone, Default, Copy, Serialize, Deserialize)] +#[derive(Clone, Default, Copy, Serialize, Deserialize)] pub struct Timestamp { value: i64, unit: TimeUnit, @@ -498,6 +498,12 @@ impl From for serde_json::Value { } } +impl fmt::Debug for Timestamp { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{}::{}", self.value, self.unit) + } +} + #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum TimeUnit { Second, @@ -1382,4 +1388,24 @@ mod tests { Timestamp::MAX_SECOND.to_timezone_aware_string(Some(&Timezone::Named(Tz::UTC))) ); } + + #[test] + fn test_debug_timestamp() { + assert_eq!( + "1000::Second", + format!("{:?}", Timestamp::new(1000, TimeUnit::Second)) + ); + assert_eq!( + "1001::Millisecond", + format!("{:?}", Timestamp::new(1001, TimeUnit::Millisecond)) + ); + assert_eq!( + "1002::Microsecond", + format!("{:?}", Timestamp::new(1002, TimeUnit::Microsecond)) + ); + assert_eq!( + "1003::Nanosecond", + format!("{:?}", Timestamp::new(1003, TimeUnit::Nanosecond)) + ); + } } diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index eebbd1f48def..f8b79cab523a 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -25,7 +25,7 @@ use crate::compaction::buckets::infer_time_bucket; use crate::compaction::picker::{CompactionTask, Picker}; use crate::compaction::task::CompactionTaskImpl; use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest}; -use crate::sst::file::{FileHandle, FileId}; +use crate::sst::file::{overlaps, FileHandle, FileId}; use crate::sst::version::LevelMeta; /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction @@ -271,15 +271,6 @@ fn assign_to_windows<'a>( windows.into_iter().map(|w| (w.time_window, w)).collect() } -/// Checks if two inclusive timestamp ranges overlap with each other. -fn overlaps(l: &(Timestamp, Timestamp), r: &(Timestamp, Timestamp)) -> bool { - let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) }; - let (_, l_end) = l; - let (r_start, _) = r; - - r_start <= l_end -} - /// Finds the latest active writing window among all files. /// Returns `None` when there are no files or all files are corrupted. fn find_latest_window_in_seconds<'a>( diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 77f2e4e67dd8..ed9d64ee2c04 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -82,7 +82,7 @@ async fn test_append_mode_write_query() { .scan_region(region_id, ScanRequest::default()) .unwrap(); let seq_scan = scan.seq_scan().unwrap(); - let stream = seq_scan.build_stream().await.unwrap(); + let stream = seq_scan.build_stream().unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, batches.pretty_print().unwrap()); } diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 439b3a2fe0d3..9179d8a07411 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -330,6 +330,8 @@ async fn test_different_order_and_type() { #[tokio::test] async fn test_put_delete() { + common_telemetry::init_default_ut_logging(); + let mut env = TestEnv::new(); let engine = env.create_engine(MitoConfig::default()).await; diff --git a/src/mito2/src/engine/filter_deleted_test.rs b/src/mito2/src/engine/filter_deleted_test.rs index 4d123a89b8ba..0a89f7b2ef50 100644 --- a/src/mito2/src/engine/filter_deleted_test.rs +++ b/src/mito2/src/engine/filter_deleted_test.rs @@ -87,7 +87,7 @@ async fn test_scan_without_filtering_deleted() { let seq_scan = scan.scan_without_filter_deleted().unwrap(); - let stream = seq_scan.build_stream().await.unwrap(); + let stream = seq_scan.build_stream().unwrap(); let batches = RecordBatches::try_collect(stream).await.unwrap(); let expected = "\ +-------+---------+---------------------+ diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 6df63f0e9973..b82032bbc8d2 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -64,11 +64,19 @@ impl Default for MemtableConfig { pub struct MemtableStats { /// The estimated bytes allocated by this memtable from heap. estimated_bytes: usize, - /// The time range that this memtable contains. + /// The time range that this memtable contains. It is None if + /// and only if the memtable is empty. time_range: Option<(Timestamp, Timestamp)>, } impl MemtableStats { + /// Attaches the time range to the stats. + #[cfg(any(test, feature = "test"))] + pub(crate) fn with_time_range(mut self, time_range: Option<(Timestamp, Timestamp)>) -> Self { + self.time_range = time_range; + self + } + /// Returns the estimated bytes allocated by this memtable. pub fn bytes_allocated(&self) -> usize { self.estimated_bytes diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index ff16e72fd76d..042eaf2124f0 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -23,6 +23,7 @@ pub(crate) mod unordered_scan; use std::collections::HashSet; use std::sync::Arc; +use std::time::Duration; use api::v1::OpType; use async_trait::async_trait; @@ -50,6 +51,7 @@ use crate::error::{ ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result, }; use crate::memtable::BoxedBatchIterator; +use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::sst::parquet::reader::RowGroupReader; /// Storage internal representation of a batch of rows for a primary key (time series). @@ -744,6 +746,55 @@ impl BatchReader for Box { } } +/// Metrics for scanners. +#[derive(Debug, Default)] +pub(crate) struct ScannerMetrics { + /// Duration to prepare the scan task. + prepare_scan_cost: Duration, + /// Duration to build parts. + build_parts_cost: Duration, + /// Duration to scan data. + scan_cost: Duration, + /// Duration to convert batches. + convert_cost: Duration, + /// Duration of the scan. + total_cost: Duration, + /// Number of batches returned. + num_batches: usize, + /// Number of rows returned. + num_rows: usize, +} + +impl ScannerMetrics { + /// Sets and observes metrics on initializing parts. + fn observe_init_part(&mut self, build_parts_cost: Duration) { + self.build_parts_cost = build_parts_cost; + + // Observes metrics. + READ_STAGE_ELAPSED + .with_label_values(&["prepare_scan"]) + .observe(self.prepare_scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["build_parts"]) + .observe(self.build_parts_cost.as_secs_f64()); + } + + /// Observes metrics on scanner finish. + fn observe_metrics_on_finish(&self) { + READ_STAGE_ELAPSED + .with_label_values(&["convert_rb"]) + .observe(self.convert_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["scan"]) + .observe(self.scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["total"]) + .observe(self.total_cost.as_secs_f64()); + READ_ROWS_RETURN.observe(self.num_rows as f64); + READ_BATCHES_RETURN.observe(self.num_batches as f64); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 2e6325e1b7c5..a185584d4358 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -16,16 +16,19 @@ use std::fmt; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, error, warn}; use common_time::range::TimestampRange; -use store_api::region_engine::{RegionScannerRef, SinglePartitionScanner}; +use common_time::Timestamp; +use datafusion::physical_plan::DisplayFormatType; +use smallvec::SmallVec; +use store_api::region_engine::RegionScannerRef; use store_api::storage::ScanRequest; use table::predicate::{build_time_range_predicate, Predicate}; -use tokio::sync::{mpsc, Semaphore}; +use tokio::sync::{mpsc, Mutex, Semaphore}; use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; @@ -34,13 +37,13 @@ use crate::cache::CacheManagerRef; use crate::error::Result; use crate::memtable::MemtableRef; use crate::metrics::READ_SST_COUNT; -use crate::read::compat::{CompatBatch, CompatReader}; +use crate::read::compat::{self, CompatBatch}; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; use crate::read::unordered_scan::UnorderedScan; -use crate::read::{compat, Batch, Source}; +use crate::read::{Batch, Source}; use crate::region::version::VersionRef; -use crate::sst::file::{FileHandle, FileMeta}; +use crate::sst::file::{overlaps, FileHandle, FileMeta}; use crate::sst::index::applier::builder::SstIndexApplierBuilder; use crate::sst::index::applier::SstIndexApplierRef; use crate::sst::parquet::file_range::FileRange; @@ -57,7 +60,7 @@ impl Scanner { /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions. pub(crate) async fn scan(&self) -> Result { match self { - Scanner::Seq(seq_scan) => seq_scan.build_stream().await.map_err(BoxedError::new), + Scanner::Seq(seq_scan) => seq_scan.build_stream(), Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await, } } @@ -65,11 +68,7 @@ impl Scanner { /// Returns a [RegionScanner] to scan the region. pub(crate) async fn region_scanner(self) -> Result { match self { - Scanner::Seq(seq_scan) => { - let stream = seq_scan.build_stream().await?; - let scanner = Arc::new(SinglePartitionScanner::new(stream)); - Ok(scanner) - } + Scanner::Seq(seq_scan) => Ok(Arc::new(seq_scan)), Scanner::Unordered(unordered_scan) => Ok(Arc::new(unordered_scan)), } } @@ -221,9 +220,7 @@ impl ScanRegion { /// Scan sequentially. pub(crate) fn seq_scan(self) -> Result { let input = self.scan_input(true)?; - let seq_scan = SeqScan::new(input); - - Ok(seq_scan) + Ok(SeqScan::new(input)) } /// Unordered scan. @@ -235,8 +232,7 @@ impl ScanRegion { #[cfg(test)] pub(crate) fn scan_without_filter_deleted(self) -> Result { let input = self.scan_input(false)?; - let scan = SeqScan::new(input); - Ok(scan) + Ok(SeqScan::new(input)) } /// Creates a scan input. @@ -263,9 +259,8 @@ impl ScanRegion { return false; } let stats = mem.stats(); - let Some((start, end)) = stats.time_range() else { - return true; - }; + // Safety: the memtable is not empty. + let (start, end) = stats.time_range().unwrap(); // The time range of the memtable is inclusive. let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end)); @@ -364,13 +359,6 @@ pub(crate) struct ScanParallism { pub(crate) channel_size: usize, } -impl ScanParallism { - /// Returns true if we allow parallel scan. - pub(crate) fn allow_parallel_scan(&self) -> bool { - self.parallelism > 1 - } -} - /// Returns true if the time range of a SST `file` matches the `predicate`. fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { if predicate == &TimestampRange::min_to_max() { @@ -509,60 +497,15 @@ impl ScanInput { self } - /// Builds and returns sources to read. - pub(crate) async fn build_sources(&self) -> Result> { - let mut sources = Vec::with_capacity(self.memtables.len() + self.files.len()); - for mem in &self.memtables { - let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone())?; - sources.push(Source::Iter(iter)); - } - for file in &self.files { - let maybe_reader = self - .access_layer - .read_sst(file.clone()) - .predicate(self.predicate.clone()) - .time_range(self.time_range) - .projection(Some(self.mapper.column_ids().to_vec())) - .cache(self.cache_manager.clone()) - .index_applier(self.index_applier.clone()) - .expected_metadata(Some(self.mapper.metadata().clone())) - .build() - .await; - let reader = match maybe_reader { - Ok(reader) => reader, - Err(e) => { - if e.is_object_not_found() && self.ignore_file_not_found { - error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id()); - continue; - } else { - return Err(e); - } - } - }; - if compat::has_same_columns(self.mapper.metadata(), reader.metadata()) { - sources.push(Source::Reader(Box::new(reader))); - } else { - // They have different schema. We need to adapt the batch first so the - // mapper can convert it. - let compat_reader = - CompatReader::new(&self.mapper, reader.metadata().clone(), reader)?; - sources.push(Source::Reader(Box::new(compat_reader))); - } - } - - READ_SST_COUNT.observe(self.files.len() as f64); - - Ok(sources) - } - /// Scans sources in parallel. /// /// # Panics if the input doesn't allow parallel scan. - pub(crate) async fn build_parallel_sources(&self) -> Result> { - assert!(self.parallelism.allow_parallel_scan()); - // Scall all memtables and SSTs. - let sources = self.build_sources().await?; - let semaphore = Arc::new(Semaphore::new(self.parallelism.parallelism)); + pub(crate) fn create_parallel_sources( + &self, + sources: Vec, + semaphore: Arc, + ) -> Result> { + debug_assert!(self.parallelism.parallelism > 1); // Spawn a task for each source. let sources = sources .into_iter() @@ -576,7 +519,7 @@ impl ScanInput { Ok(sources) } - /// Prunes file ranges to scan and adds them tothe `collector`. + /// Prunes file ranges to scan and adds them to the `collector`. pub(crate) async fn prune_file_ranges( &self, collector: &mut impl FileRangeCollector, @@ -641,7 +584,7 @@ impl ScanInput { common_runtime::spawn_read(async move { loop { // We release the permit before sending result to avoid the task waiting on - // the channel with the permit holded + // the channel with the permit held. let maybe_batch = { // Safety: We never close the semaphore. let _permit = semaphore.acquire().await.unwrap(); @@ -680,6 +623,10 @@ impl ScanInput { } } +/// Groups of file ranges. Each group in the list contains multiple file +/// ranges to scan. File ranges in the same group may come from different files. +pub(crate) type FileRangesGroup = SmallVec<[Vec; 4]>; + /// A partition of a scanner to read. /// It contains memtables and file ranges to scan. #[derive(Default)] @@ -688,17 +635,60 @@ pub(crate) struct ScanPart { /// We scan the whole memtable now. We might scan a range of the memtable in the future. pub(crate) memtables: Vec, /// File ranges to scan. - pub(crate) file_ranges: Vec, + pub(crate) file_ranges: FileRangesGroup, + /// Optional time range of the part (inclusive). + pub(crate) time_range: Option<(Timestamp, Timestamp)>, } impl fmt::Debug for ScanPart { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "ScanPart({} memtables, {} file ranges)", + "ScanPart({} memtables, {} file ranges", self.memtables.len(), - self.file_ranges.len() - ) + self.file_ranges + .iter() + .map(|ranges| ranges.len()) + .sum::(), + )?; + if let Some(time_range) = &self.time_range { + write!(f, ", time range: {:?})", time_range) + } else { + write!(f, ")") + } + } +} + +impl ScanPart { + /// Returns true if the time range given `part` overlaps with this part. + pub(crate) fn overlaps(&self, part: &ScanPart) -> bool { + let (Some(current_range), Some(part_range)) = (self.time_range, part.time_range) else { + return true; + }; + + overlaps(¤t_range, &part_range) + } + + /// Merges given `part` to this part. + pub(crate) fn merge(&mut self, mut part: ScanPart) { + self.memtables.append(&mut part.memtables); + self.file_ranges.append(&mut part.file_ranges); + let Some(part_range) = part.time_range else { + return; + }; + let Some(current_range) = self.time_range else { + self.time_range = part.time_range; + return; + }; + let start = current_range.0.min(part_range.0); + let end = current_range.1.max(part_range.1); + self.time_range = Some((start, end)); + } + + /// Returns true if the we can split the part into multiple parts + /// and preserving order. + pub(crate) fn can_split_preserve_order(&self) -> bool { + self.memtables.is_empty() && self.file_ranges.len() == 1 && self.file_ranges[0].len() > 1 } } @@ -711,3 +701,105 @@ pub(crate) trait FileRangeCollector { file_ranges: impl Iterator, ); } + +/// Optional list of [ScanPart]s. +#[derive(Default)] +pub(crate) struct ScanPartList(pub(crate) Option>); + +impl fmt::Debug for ScanPartList { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.0 { + Some(parts) => write!(f, "{:?}", parts), + None => write!(f, "[]"), + } + } +} + +impl ScanPartList { + /// Returns true if the list is None. + pub(crate) fn is_none(&self) -> bool { + self.0.is_none() + } + + /// Sets parts to the list. + pub(crate) fn set_parts(&mut self, parts: Vec) { + self.0 = Some(parts); + } + + /// Gets the part by index, returns None if the index is out of bound. + /// # Panics + /// Panics if parts are not initialized. + pub(crate) fn get_part(&mut self, index: usize) -> Option<&ScanPart> { + let parts = self.0.as_ref().unwrap(); + parts.get(index) + } + + /// Returns the number of parts. + pub(crate) fn len(&self) -> usize { + self.0.as_ref().map_or(0, |parts| parts.len()) + } + + /// Returns the number of memtables. + pub(crate) fn num_memtables(&self) -> usize { + self.0.as_ref().map_or(0, |parts| { + parts.iter().map(|part| part.memtables.len()).sum() + }) + } + + /// Returns the number of file ranges. + pub(crate) fn num_file_ranges(&self) -> usize { + self.0.as_ref().map_or(0, |parts| { + parts.iter().map(|part| part.file_ranges.len()).sum() + }) + } +} + +/// Context shared by different streams from a scanner. +/// It contains the input and distributes input to multiple parts +/// to scan. +pub(crate) struct StreamContext { + /// Input memtables and files. + pub(crate) input: ScanInput, + /// Parts to scan. + /// The scanner builds parts to scan from the input lazily. + /// The mutex is used to ensure the parts are only built once. + pub(crate) parts: Mutex, + + // Metrics: + /// The start time of the query. + pub(crate) query_start: Instant, + /// Time elapsed before creating the scanner. + pub(crate) prepare_scan_cost: Duration, +} + +impl StreamContext { + /// Creates a new [StreamContext]. + pub(crate) fn new(input: ScanInput) -> Self { + let query_start = input.query_start.unwrap_or_else(Instant::now); + let prepare_scan_cost = query_start.elapsed(); + + Self { + input, + parts: Mutex::new(ScanPartList::default()), + query_start, + prepare_scan_cost, + } + } + + /// Format parts for explain. + pub(crate) fn format_parts(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match self.parts.try_lock() { + Ok(inner) => match t { + DisplayFormatType::Default => write!( + f, + "partition_count={} ({} memtables, {} file ranges)", + inner.len(), + inner.num_memtables(), + inner.num_file_ranges() + ), + DisplayFormatType::Verbose => write!(f, "{:?}", &*inner), + }, + Err(_) => write!(f, ""), + } + } +} diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 2277a8df32f9..cc810ba0d8b6 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -14,172 +14,550 @@ //! Sequential scan. -use std::time::{Duration, Instant}; +use std::fmt; +use std::sync::Arc; +use std::time::Instant; use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; -use common_telemetry::{debug, tracing}; +use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_telemetry::debug; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; +use datatypes::schema::SchemaRef; +use smallvec::smallvec; use snafu::ResultExt; +use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties}; +use store_api::storage::ColumnId; +use table::predicate::Predicate; +use tokio::sync::Semaphore; -use crate::cache::CacheManager; use crate::error::Result; -use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; -use crate::read::merge::MergeReaderBuilder; -use crate::read::projection::ProjectionMapper; -use crate::read::scan_region::ScanInput; -use crate::read::{BatchReader, BoxedBatchReader}; +use crate::memtable::MemtableRef; +use crate::read::merge::{MergeReader, MergeReaderBuilder}; +use crate::read::scan_region::{ + FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext, +}; +use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source}; +use crate::sst::file::FileMeta; +use crate::sst::parquet::file_range::FileRange; +use crate::sst::parquet::reader::ReaderMetrics; /// Scans a region and returns rows in a sorted sequence. /// -/// The output order is always `order by primary key, time index`. +/// The output order is always `order by primary keys, time index`. pub struct SeqScan { - input: ScanInput, + /// Properties of the scanner. + properties: ScannerProperties, + /// Context of streams. + stream_ctx: Arc, + /// Semaphore to control scan parallelism of files. + /// Streams created by the scanner share the same semaphore. + semaphore: Arc, } impl SeqScan { /// Creates a new [SeqScan]. - #[must_use] - pub(crate) fn new(input: ScanInput) -> SeqScan { - SeqScan { input } + pub(crate) fn new(input: ScanInput) -> Self { + let parallelism = input.parallelism.parallelism.max(1); + let properties = ScannerProperties::new(ScannerPartitioning::Unknown(parallelism)); + let stream_ctx = Arc::new(StreamContext::new(input)); + + Self { + properties, + stream_ctx, + semaphore: Arc::new(Semaphore::new(parallelism)), + } } /// Builds a stream for the query. - pub async fn build_stream(&self) -> Result { - let mut metrics = Metrics::default(); - let build_start = Instant::now(); - let query_start = self.input.query_start.unwrap_or(build_start); - metrics.prepare_scan_cost = query_start.elapsed(); - let use_parallel = self.use_parallel_reader(); - // Scans all memtables and SSTs. Builds a merge reader to merge results. - let mut reader = if use_parallel { - self.build_parallel_reader().await? + pub fn build_stream(&self) -> Result { + self.scan_partition_opt(None) + } + + /// Builds a [BoxedBatchReader] from sequential scan for compaction. + pub async fn build_reader(&self) -> Result { + let mut metrics = ScannerMetrics { + prepare_scan_cost: self.stream_ctx.prepare_scan_cost, + ..Default::default() + }; + let maybe_reader = + Self::build_merge_reader(&self.stream_ctx, None, self.semaphore.clone(), &mut metrics) + .await?; + // Safety: `build_merge_reader()` always returns a reader if partition is None. + let reader = maybe_reader.unwrap(); + Ok(Box::new(reader)) + } + + /// Builds sources from a [ScanPart]. + fn build_part_sources( + part: &ScanPart, + projection: Option<&[ColumnId]>, + predicate: Option<&Predicate>, + sources: &mut Vec, + ) -> Result<()> { + sources.reserve(part.memtables.len() + part.file_ranges.len()); + // Read memtables. + for mem in &part.memtables { + let iter = mem.iter(projection, predicate.cloned())?; + sources.push(Source::Iter(iter)); + } + // Read files. + for file in &part.file_ranges { + if file.is_empty() { + continue; + } + + // Creates a stream to read the file. + let ranges = file.clone(); + let stream = try_stream! { + let mut reader_metrics = ReaderMetrics::default(); + // Safety: We checked whether it is empty before. + let file_id = ranges[0].file_handle().file_id(); + let region_id = ranges[0].file_handle().region_id(); + let range_num = ranges.len(); + for range in ranges { + let mut reader = range.reader().await?; + let compat_batch = range.compat_batch(); + while let Some(mut batch) = reader.next_batch().await? { + if let Some(compat) = compat_batch { + batch = compat + .compat_batch(batch)?; + } + + yield batch; + } + reader_metrics.merge_from(reader.metrics()); + } + debug!( + "Seq scan region {}, file {}, {} ranges finished, metrics: {:?}", + region_id, file_id, range_num, reader_metrics + ); + }; + let stream = Box::pin(stream); + sources.push(Source::Stream(stream)); + } + + Ok(()) + } + + /// Builds a merge reader. + /// If `partition` is None, reads all partitions. + /// If the `partition` is out of bound, returns None. + async fn build_merge_reader( + stream_ctx: &StreamContext, + partition: Option, + semaphore: Arc, + metrics: &mut ScannerMetrics, + ) -> Result> { + let mut parts = stream_ctx.parts.lock().await; + maybe_init_parts(&stream_ctx.input, &mut parts, metrics).await?; + + let input = &stream_ctx.input; + let mut sources = Vec::new(); + if let Some(index) = partition { + let Some(part) = parts.get_part(index) else { + return Ok(None); + }; + + Self::build_part_sources( + part, + Some(input.mapper.column_ids()), + input.predicate.as_ref(), + &mut sources, + )?; } else { - self.build_reader().await? + // Safety: We initialized parts before. + for part in parts.0.as_ref().unwrap() { + Self::build_part_sources( + part, + Some(input.mapper.column_ids()), + input.predicate.as_ref(), + &mut sources, + )?; + } + } + + if stream_ctx.input.parallelism.parallelism > 1 { + // Read sources in parallel. We always spawn a task so we can control the parallelism + // by the semaphore. + sources = stream_ctx + .input + .create_parallel_sources(sources, semaphore.clone())?; + } + + let dedup = !stream_ctx.input.append_mode; + let mut builder = + MergeReaderBuilder::from_sources(sources, dedup, stream_ctx.input.filter_deleted); + builder.build().await.map(Some) + } + + /// Scans one partition or all partitions. + fn scan_partition_opt( + &self, + partition: Option, + ) -> Result { + let mut metrics = ScannerMetrics { + prepare_scan_cost: self.stream_ctx.prepare_scan_cost, + ..Default::default() }; - metrics.build_reader_cost = build_start.elapsed(); - READ_STAGE_ELAPSED - .with_label_values(&["prepare_scan"]) - .observe(metrics.prepare_scan_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["build_reader"]) - .observe(metrics.build_reader_cost.as_secs_f64()); - - // Creates a stream to poll the batch reader and convert batch into record batch. - let mapper = self.input.mapper.clone(); - let cache_manager = self.input.cache_manager.clone(); - let parallelism = self.input.parallelism.parallelism; + let stream_ctx = self.stream_ctx.clone(); + let semaphore = self.semaphore.clone(); let stream = try_stream! { - let cache = cache_manager.as_ref().map(|cache| cache.as_ref()); - while let Some(batch) = - Self::fetch_record_batch(&mut reader, &mapper, cache, &mut metrics).await? + let maybe_reader = Self::build_merge_reader(&stream_ctx, partition, semaphore, &mut metrics) + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let Some(mut reader) = maybe_reader else { + return; + }; + let cache = stream_ctx.input.cache_manager.as_deref(); + let mut fetch_start = Instant::now(); + while let Some(batch) = reader + .next_batch() + .await + .map_err(BoxedError::new) + .context(ExternalSnafu)? { + metrics.scan_cost += fetch_start.elapsed(); metrics.num_batches += 1; metrics.num_rows += batch.num_rows(); - yield batch; + + let convert_start = Instant::now(); + let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; + metrics.convert_cost += convert_start.elapsed(); + yield record_batch; + + fetch_start = Instant::now(); } + metrics.scan_cost += fetch_start.elapsed(); + metrics.total_cost = stream_ctx.query_start.elapsed(); + metrics.observe_metrics_on_finish(); - // Update metrics. - metrics.total_cost = query_start.elapsed(); - READ_STAGE_ELAPSED.with_label_values(&["convert_rb"]).observe(metrics.convert_cost.as_secs_f64()); - READ_STAGE_ELAPSED.with_label_values(&["scan"]).observe(metrics.scan_cost.as_secs_f64()); - READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.total_cost.as_secs_f64()); - READ_ROWS_RETURN.observe(metrics.num_rows as f64); - READ_BATCHES_RETURN.observe(metrics.num_batches as f64); debug!( - "Seq scan finished, region_id: {:?}, metrics: {:?}, use_parallel: {}, parallelism: {}", - mapper.metadata().region_id, metrics, use_parallel, parallelism, + "Seq scan finished, region_id: {:?}, partition: {:?}, metrics: {:?}", + stream_ctx.input.mapper.metadata().region_id, partition, metrics, ); }; + let stream = Box::pin(RecordBatchStreamWrapper::new( - self.input.mapper.output_schema(), + self.stream_ctx.input.mapper.output_schema(), Box::pin(stream), )); Ok(stream) } +} - /// Builds a [BoxedBatchReader] from sequential scan. - pub async fn build_reader(&self) -> Result { - // Scans all memtables and SSTs. Builds a merge reader to merge results. - let sources = self.input.build_sources().await?; - let dedup = !self.input.append_mode; - let mut builder = - MergeReaderBuilder::from_sources(sources, dedup, self.input.filter_deleted); - let reader = builder.build().await?; - Ok(Box::new(reader)) +impl RegionScanner for SeqScan { + fn properties(&self) -> &ScannerProperties { + &self.properties } - /// Builds a [BoxedBatchReader] that can scan memtables and SSTs in parallel. - async fn build_parallel_reader(&self) -> Result { - let sources = self.input.build_parallel_sources().await?; - let dedup = !self.input.append_mode; - let mut builder = - MergeReaderBuilder::from_sources(sources, dedup, self.input.filter_deleted); - let reader = builder.build().await?; - Ok(Box::new(reader)) + fn schema(&self) -> SchemaRef { + self.stream_ctx.input.mapper.output_schema() } - /// Returns whether to use a parallel reader. - fn use_parallel_reader(&self) -> bool { - self.input.parallelism.allow_parallel_scan() - && (self.input.files.len() + self.input.memtables.len()) > 1 + fn scan_partition(&self, partition: usize) -> Result { + self.scan_partition_opt(Some(partition)) } +} - /// Fetch a batch from the reader and convert it into a record batch. - #[tracing::instrument(skip_all, level = "trace")] - async fn fetch_record_batch( - reader: &mut dyn BatchReader, - mapper: &ProjectionMapper, - cache: Option<&CacheManager>, - metrics: &mut Metrics, - ) -> common_recordbatch::error::Result> { - let start = Instant::now(); +impl DisplayAs for SeqScan { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "SeqScan: ")?; + self.stream_ctx.format_parts(t, f) + } +} - let Some(batch) = reader - .next_batch() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? - else { - metrics.scan_cost += start.elapsed(); +impl fmt::Debug for SeqScan { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SeqScan") + .field("parts", &self.stream_ctx.parts) + .field("prepare_scan_cost", &self.stream_ctx.prepare_scan_cost) + .finish() + } +} + +#[cfg(test)] +impl SeqScan { + /// Returns the input. + pub(crate) fn input(&self) -> &ScanInput { + &self.stream_ctx.input + } +} - return Ok(None); +/// Initializes parts if they are not built yet. +async fn maybe_init_parts( + input: &ScanInput, + part_list: &mut ScanPartList, + metrics: &mut ScannerMetrics, +) -> Result<()> { + if part_list.is_none() { + let now = Instant::now(); + let mut distributor = SeqDistributor::default(); + input.prune_file_ranges(&mut distributor).await?; + part_list + .set_parts(distributor.build_parts(&input.memtables, input.parallelism.parallelism)); + + metrics.observe_init_part(now.elapsed()); + } + Ok(()) +} + +/// Builds [ScanPart]s that preserves order. +#[derive(Default)] +pub(crate) struct SeqDistributor { + parts: Vec, +} + +impl FileRangeCollector for SeqDistributor { + fn append_file_ranges( + &mut self, + file_meta: &FileMeta, + file_ranges: impl Iterator, + ) { + // Creates a [ScanPart] for each file. + let ranges: Vec<_> = file_ranges.collect(); + if ranges.is_empty() { + // No ranges to read. + return; + } + let part = ScanPart { + memtables: Vec::new(), + file_ranges: smallvec![ranges], + time_range: Some(file_meta.time_range), }; + self.parts.push(part); + } +} + +impl SeqDistributor { + /// Groups file ranges and memtables by time ranges. + /// The output number of parts may be `<= parallelism`. If `parallelism` is 0, it will be set to 1. + /// + /// Output parts have non-overlapping time ranges. + fn build_parts(mut self, memtables: &[MemtableRef], parallelism: usize) -> Vec { + // Creates a part for each memtable. + for mem in memtables { + let stats = mem.stats(); + let part = ScanPart { + memtables: vec![mem.clone()], + file_ranges: smallvec![], + time_range: stats.time_range(), + }; + self.parts.push(part); + } + + let parallelism = parallelism.max(1); + let parts = group_parts_by_range(self.parts); + let parts = maybe_split_parts(parts, parallelism); + // Ensures it doesn't returns parts more than `parallelism`. + maybe_merge_parts(parts, parallelism) + } +} + +/// Groups parts by time range. It may generate parts more than parallelism. +/// All time ranges are not None. +fn group_parts_by_range(mut parts: Vec) -> Vec { + if parts.is_empty() { + return Vec::new(); + } + + // Sorts parts by time range. + parts.sort_unstable_by(|a, b| { + // Safety: time ranges of parts from [SeqPartBuilder] are not None. + let a = a.time_range.unwrap(); + let b = b.time_range.unwrap(); + a.0.cmp(&b.0).then_with(|| b.1.cmp(&a.1)) + }); + let mut part_in_range = None; + // Parts with exclusive time ranges. + let mut part_groups = Vec::new(); + for part in parts { + let Some(mut prev_part) = part_in_range.take() else { + part_in_range = Some(part); + continue; + }; + + if prev_part.overlaps(&part) { + prev_part.merge(part); + part_in_range = Some(prev_part); + } else { + // A new group. + part_groups.push(prev_part); + part_in_range = Some(part); + } + } + if let Some(part) = part_in_range { + part_groups.push(part); + } + + part_groups +} + +/// Merges parts by parallelism. +/// It merges parts if the number of parts is greater than `parallelism`. +fn maybe_merge_parts(mut parts: Vec, parallelism: usize) -> Vec { + assert!(parallelism > 0); + if parts.len() <= parallelism { + // No need to merge parts. + return parts; + } - let convert_start = Instant::now(); - let record_batch = mapper.convert(&batch, cache)?; - metrics.convert_cost += convert_start.elapsed(); - metrics.scan_cost += start.elapsed(); + // Sort parts by number of memtables and ranges in reverse order. + parts.sort_unstable_by(|a, b| { + a.memtables + .len() + .cmp(&b.memtables.len()) + .then_with(|| { + let a_ranges_len = a + .file_ranges + .iter() + .map(|ranges| ranges.len()) + .sum::(); + let b_ranges_len = b + .file_ranges + .iter() + .map(|ranges| ranges.len()) + .sum::(); + a_ranges_len.cmp(&b_ranges_len) + }) + .reverse() + }); - Ok(Some(record_batch)) + let parts_to_reduce = parts.len() - parallelism; + for _ in 0..parts_to_reduce { + // Safety: We ensure `parts.len() > parallelism`. + let part = parts.pop().unwrap(); + parts.last_mut().unwrap().merge(part); } + + parts } -/// Metrics for [SeqScan]. -#[derive(Debug, Default)] -struct Metrics { - /// Duration to prepare the scan task. - prepare_scan_cost: Duration, - /// Duration to build the reader. - build_reader_cost: Duration, - /// Duration to scan data. - scan_cost: Duration, - /// Duration to convert batches. - convert_cost: Duration, - /// Duration of the scan. - total_cost: Duration, - /// Number of batches returned. - num_batches: usize, - /// Number of rows returned. - num_rows: usize, +/// Splits parts by parallelism. +/// It splits a part if it only scans one file and doesn't scan any memtable. +fn maybe_split_parts(mut parts: Vec, parallelism: usize) -> Vec { + assert!(parallelism > 0); + if parts.len() >= parallelism { + // No need to split parts. + return parts; + } + + let has_part_to_split = parts.iter().any(|part| part.can_split_preserve_order()); + if !has_part_to_split { + // No proper parts to scan. + return parts; + } + + // Sorts parts by the number of ranges in the first file. + parts.sort_unstable_by(|a, b| { + let a_len = a.file_ranges.first().map(|file| file.len()).unwrap_or(0); + let b_len = b.file_ranges.first().map(|file| file.len()).unwrap_or(0); + a_len.cmp(&b_len).reverse() + }); + let num_parts_to_split = parallelism - parts.len(); + let mut output_parts = Vec::with_capacity(parallelism); + // Split parts up to num_parts_to_split. + for part in parts.iter_mut() { + if !part.can_split_preserve_order() { + continue; + } + // Safety: `can_split_preserve_order()` ensures file_ranges.len() == 1. + // Splits part into `num_parts_to_split + 1` new parts if possible. + let target_part_num = num_parts_to_split + 1; + let ranges_per_part = (part.file_ranges[0].len() + target_part_num - 1) / target_part_num; + // `can_split_preserve_order()` ensures part.file_ranges[0].len() > 1. + assert!(ranges_per_part > 0); + for ranges in part.file_ranges[0].chunks(ranges_per_part) { + let new_part = ScanPart { + memtables: Vec::new(), + file_ranges: smallvec![ranges.to_vec()], + time_range: part.time_range, + }; + output_parts.push(new_part); + } + // Replace the current part with the last output part as we will put the current part + // into the output parts later. + *part = output_parts.pop().unwrap(); + if output_parts.len() >= num_parts_to_split { + // We already split enough parts. + break; + } + } + // Put the remaining parts into the output parts. + output_parts.append(&mut parts); + + output_parts } #[cfg(test)] -impl SeqScan { - /// Returns the input. - pub(crate) fn input(&self) -> &ScanInput { - &self.input +mod tests { + use std::sync::Arc; + + use common_time::timestamp::TimeUnit; + use common_time::Timestamp; + + use super::*; + use crate::memtable::MemtableId; + use crate::test_util::memtable_util::EmptyMemtable; + + type Output = (Vec, i64, i64); + + fn run_group_parts_test(input: &[(MemtableId, i64, i64)], expect: &[Output]) { + let parts = input + .iter() + .map(|(id, start, end)| { + let range = ( + Timestamp::new(*start, TimeUnit::Second), + Timestamp::new(*end, TimeUnit::Second), + ); + ScanPart { + memtables: vec![Arc::new( + EmptyMemtable::new(*id).with_time_range(Some(range)), + )], + file_ranges: smallvec![], + time_range: Some(range), + } + }) + .collect(); + let output = group_parts_by_range(parts); + let actual: Vec<_> = output + .iter() + .map(|part| { + let ids: Vec<_> = part.memtables.iter().map(|mem| mem.id()).collect(); + let range = part.time_range.unwrap(); + (ids, range.0.value(), range.1.value()) + }) + .collect(); + assert_eq!(expect, actual); + } + + #[test] + fn test_group_parts() { + // Group 1 part. + run_group_parts_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]); + + // 1, 2, 3, 4 => [3, 1, 4], [2] + run_group_parts_test( + &[ + (1, 1000, 2000), + (2, 6000, 7000), + (3, 0, 1500), + (4, 1500, 3000), + ], + &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)], + ); + + // 1, 2, 3 => [3], [1], [2], + run_group_parts_test( + &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)], + &[ + (vec![3], 0, 1000), + (vec![1], 3000, 4000), + (vec![2], 4001, 6000), + ], + ); } } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 48f682969d32..eccd8ec88c79 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -16,7 +16,7 @@ use std::fmt; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; @@ -26,18 +26,19 @@ use common_telemetry::debug; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::StreamExt; +use smallvec::smallvec; use snafu::ResultExt; use store_api::region_engine::{RegionScanner, ScannerPartitioning, ScannerProperties}; -use tokio::sync::Mutex; use crate::cache::CacheManager; use crate::error::Result; use crate::memtable::MemtableRef; -use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::read::compat::CompatBatch; use crate::read::projection::ProjectionMapper; -use crate::read::scan_region::{FileRangeCollector, ScanInput, ScanPart}; -use crate::read::Source; +use crate::read::scan_region::{ + FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext, +}; +use crate::read::{ScannerMetrics, Source}; use crate::sst::file::FileMeta; use crate::sst::parquet::file_range::FileRange; use crate::sst::parquet::reader::ReaderMetrics; @@ -55,22 +56,10 @@ pub struct UnorderedScan { impl UnorderedScan { /// Creates a new [UnorderedScan]. pub(crate) fn new(input: ScanInput) -> Self { - let query_start = input.query_start.unwrap_or_else(Instant::now); - let prepare_scan_cost = query_start.elapsed(); - let properties = - ScannerProperties::new(ScannerPartitioning::Unknown(input.parallelism.parallelism)); - - // Observes metrics. - READ_STAGE_ELAPSED - .with_label_values(&["prepare_scan"]) - .observe(prepare_scan_cost.as_secs_f64()); - - let stream_ctx = Arc::new(StreamContext { - input, - parts: Mutex::new(ScanPartList::default()), - query_start, - prepare_scan_cost, - }); + let properties = ScannerProperties::new(ScannerPartitioning::Unknown( + input.parallelism.parallelism.max(1), + )); + let stream_ctx = Arc::new(StreamContext::new(input)); Self { properties, @@ -104,7 +93,7 @@ impl UnorderedScan { mapper: &ProjectionMapper, cache: Option<&CacheManager>, compat_batch: Option<&CompatBatch>, - metrics: &mut Metrics, + metrics: &mut ScannerMetrics, ) -> common_recordbatch::error::Result> { let start = Instant::now(); @@ -133,20 +122,6 @@ impl UnorderedScan { Ok(Some(record_batch)) } - - fn observe_metrics_on_finish(metrics: &Metrics) { - READ_STAGE_ELAPSED - .with_label_values(&["convert_rb"]) - .observe(metrics.convert_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["scan"]) - .observe(metrics.scan_cost.as_secs_f64()); - READ_STAGE_ELAPSED - .with_label_values(&["total"]) - .observe(metrics.total_cost.as_secs_f64()); - READ_ROWS_RETURN.observe(metrics.num_rows as f64); - READ_BATCHES_RETURN.observe(metrics.num_batches as f64); - } } impl RegionScanner for UnorderedScan { @@ -159,15 +134,14 @@ impl RegionScanner for UnorderedScan { } fn scan_partition(&self, partition: usize) -> Result { - let mut metrics = Metrics { + let mut metrics = ScannerMetrics { prepare_scan_cost: self.stream_ctx.prepare_scan_cost, ..Default::default() }; let stream_ctx = self.stream_ctx.clone(); let stream = try_stream! { let mut parts = stream_ctx.parts.lock().await; - parts - .maybe_init_parts(&stream_ctx.input, &mut metrics) + maybe_init_parts(&mut parts, &stream_ctx.input, &mut metrics) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -201,7 +175,8 @@ impl RegionScanner for UnorderedScan { } // Then scans file ranges. let mut reader_metrics = ReaderMetrics::default(); - for file_range in &part.file_ranges { + // Safety: UnorderedDistributor::build_parts() ensures this. + for file_range in &part.file_ranges[0] { let reader = file_range.reader().await.map_err(BoxedError::new).context(ExternalSnafu)?; let compat_batch = file_range.compat_batch(); let mut source = Source::RowGroupReader(reader); @@ -216,7 +191,7 @@ impl RegionScanner for UnorderedScan { } metrics.total_cost = query_start.elapsed(); - Self::observe_metrics_on_finish(&metrics); + metrics.observe_metrics_on_finish(); debug!( "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}", partition, mapper.metadata().region_id, metrics, reader_metrics @@ -232,8 +207,9 @@ impl RegionScanner for UnorderedScan { } impl DisplayAs for UnorderedScan { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "UnorderedScan: [{:?}]", self.stream_ctx.parts) + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "UnorderedScan: ")?; + self.stream_ctx.format_parts(t, f) } } @@ -254,73 +230,22 @@ impl UnorderedScan { } } -/// List of [ScanPart]s. -#[derive(Debug, Default)] -struct ScanPartList(Option>); - -impl ScanPartList { - /// Initializes parts if they are not built yet. - async fn maybe_init_parts(&mut self, input: &ScanInput, metrics: &mut Metrics) -> Result<()> { - if self.0.is_none() { - let now = Instant::now(); - let mut distributor = UnorderedDistributor::default(); - input.prune_file_ranges(&mut distributor).await?; - self.0 = Some(distributor.build_parts(&input.memtables, input.parallelism.parallelism)); - - metrics.build_parts_cost = now.elapsed(); - READ_STAGE_ELAPSED - .with_label_values(&["build_parts"]) - .observe(metrics.build_parts_cost.as_secs_f64()); - } - Ok(()) +/// Initializes parts if they are not built yet. +async fn maybe_init_parts( + part_list: &mut ScanPartList, + input: &ScanInput, + metrics: &mut ScannerMetrics, +) -> Result<()> { + if part_list.is_none() { + let now = Instant::now(); + let mut distributor = UnorderedDistributor::default(); + input.prune_file_ranges(&mut distributor).await?; + part_list + .set_parts(distributor.build_parts(&input.memtables, input.parallelism.parallelism)); + + metrics.observe_init_part(now.elapsed()); } - - /// Gets the part by index, returns None if the index is out of bound. - /// # Panics - /// Panics if parts are not initialized. - fn get_part(&mut self, index: usize) -> Option<&ScanPart> { - let parts = self.0.as_ref().unwrap(); - parts.get(index) - } -} - -/// Context shared by different streams. -/// It contains the input and distributes input to multiple parts -/// to scan. -struct StreamContext { - /// Input memtables and files. - input: ScanInput, - /// Parts to scan. - /// The scanner builds parts to scan from the input lazily. - /// The mutex is used to ensure the parts are only built once. - parts: Mutex, - - // Metrics: - /// The start time of the query. - query_start: Instant, - /// Time elapsed before creating the scanner. - prepare_scan_cost: Duration, -} - -/// Metrics for [UnorderedScan]. -// We print all fields in logs so we disable the dead_code lint. -#[allow(dead_code)] -#[derive(Debug, Default)] -struct Metrics { - /// Duration to prepare the scan task. - prepare_scan_cost: Duration, - /// Duration to build parts. - build_parts_cost: Duration, - /// Duration to scan data. - scan_cost: Duration, - /// Duration to convert batches. - convert_cost: Duration, - /// Duration of the scan. - total_cost: Duration, - /// Number of batches returned. - num_batches: usize, - /// Number of rows returned. - num_rows: usize, + Ok(()) } /// Builds [ScanPart]s without preserving order. It distributes file ranges and memtables @@ -344,12 +269,15 @@ impl FileRangeCollector for UnorderedDistributor { impl UnorderedDistributor { /// Distributes file ranges and memtables across partitions according to the `parallelism`. /// The output number of parts may be `<= parallelism`. + /// + /// [ScanPart] created by this distributor only contains one group of file ranges. fn build_parts(self, memtables: &[MemtableRef], parallelism: usize) -> Vec { if parallelism <= 1 { // Returns a single part. let part = ScanPart { memtables: memtables.to_vec(), - file_ranges: self.file_ranges, + file_ranges: smallvec![self.file_ranges], + time_range: None, }; return vec![part]; } @@ -368,17 +296,19 @@ impl UnorderedDistributor { .chunks(mems_per_part) .map(|mems| ScanPart { memtables: mems.to_vec(), - file_ranges: Vec::new(), + file_ranges: smallvec![Vec::new()], // Ensures there is always one group. + time_range: None, }) .collect::>(); for (i, ranges) in self.file_ranges.chunks(ranges_per_part).enumerate() { if i == scan_parts.len() { scan_parts.push(ScanPart { memtables: Vec::new(), - file_ranges: ranges.to_vec(), + file_ranges: smallvec![ranges.to_vec()], + time_range: None, }); } else { - scan_parts[i].file_ranges = ranges.to_vec(); + scan_parts[i].file_ranges = smallvec![ranges.to_vec()]; } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index f3587dd24a64..f095859831e2 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -82,6 +82,15 @@ impl FromStr for FileId { /// Time range of a SST file. pub type FileTimeRange = (Timestamp, Timestamp); +/// Checks if two inclusive timestamp ranges overlap with each other. +pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool { + let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) }; + let (_, l_end) = l; + let (r_start, _) = r; + + r_start <= l_end +} + /// Metadata of a SST file. #[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] #[serde(default)] diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 5a3b074dede8..7723a996c062 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -28,6 +28,7 @@ use crate::error::{FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result}; use crate::read::compat::CompatBatch; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec}; +use crate::sst::file::FileHandle; use crate::sst::parquet::format::ReadFormat; use crate::sst::parquet::reader::{RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext}; @@ -72,6 +73,11 @@ impl FileRange { pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> { self.context.compat_batch() } + + /// Returns the file handle of the file range. + pub(crate) fn file_handle(&self) -> &FileHandle { + self.context.reader_builder.file_handle() + } } /// Context shared by ranges of the same parquet SST. diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 59f68bf1b279..db2eb5b9cf8d 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -62,7 +62,7 @@ use crate::sst::parquet::stats::RowGroupPruningStats; use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, PARQUET_METADATA_KEY}; /// Parquet SST reader builder. -pub(crate) struct ParquetReaderBuilder { +pub struct ParquetReaderBuilder { /// SST directory. file_dir: String, file_handle: FileHandle, @@ -138,7 +138,7 @@ impl ParquetReaderBuilder { /// Attaches the index applier to the builder. #[must_use] - pub fn index_applier(mut self, index_applier: Option) -> Self { + pub(crate) fn index_applier(mut self, index_applier: Option) -> Self { self.index_applier = index_applier; self } @@ -570,6 +570,11 @@ impl RowGroupReaderBuilder { &self.file_path } + /// Handle of the file to read. + pub(crate) fn file_handle(&self) -> &FileHandle { + &self.file_handle + } + /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. pub(crate) async fn build( &self, diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index b2764ba88957..b3d1898c5bc6 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::value::ValueData; use api::v1::{Row, Rows, SemanticType}; +use common_time::Timestamp; use datatypes::arrow::array::UInt64Array; use datatypes::data_type::ConcreteDataType; use datatypes::scalars::ScalarVector; @@ -42,12 +43,26 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; pub(crate) struct EmptyMemtable { /// Id of this memtable. id: MemtableId, + /// Time range to return. + time_range: Option<(Timestamp, Timestamp)>, } impl EmptyMemtable { /// Returns a new memtable with specific `id`. pub(crate) fn new(id: MemtableId) -> EmptyMemtable { - EmptyMemtable { id } + EmptyMemtable { + id, + time_range: None, + } + } + + /// Attaches the time range to the memtable. + pub(crate) fn with_time_range( + mut self, + time_range: Option<(Timestamp, Timestamp)>, + ) -> EmptyMemtable { + self.time_range = time_range; + self } } @@ -81,7 +96,7 @@ impl Memtable for EmptyMemtable { } fn stats(&self) -> MemtableStats { - MemtableStats::default() + MemtableStats::default().with_time_range(self.time_range) } fn fork(&self, id: MemtableId, _metadata: &RegionMetadataRef) -> MemtableRef { diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index a68e0221ce69..df71efeaeab6 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -50,9 +50,16 @@ impl RegionScanExec { pub fn new(scanner: RegionScannerRef) -> Self { let arrow_schema = scanner.schema().arrow_schema().clone(); let scanner_props = scanner.properties(); + let mut num_output_partition = scanner_props.partitioning().num_partitions(); + // The meaning of word "partition" is different in different context. For datafusion + // it's about "parallelism" and for storage it's about "data range". Thus here we add + // a special case to handle the situation where the number of storage partition is 0. + if num_output_partition == 0 { + num_output_partition = 1; + } let properties = PlanProperties::new( EquivalenceProperties::new(arrow_schema.clone()), - Partitioning::UnknownPartitioning(scanner_props.partitioning().num_partitions()), + Partitioning::UnknownPartitioning(num_output_partition), ExecutionMode::Bounded, ); Self { @@ -122,9 +129,9 @@ impl ExecutionPlan for RegionScanExec { } impl DisplayAs for RegionScanExec { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { + fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { // The scanner contains all information needed to display the plan. - write!(f, "{:?}", self.scanner) + self.scanner.fmt_as(t, f) } } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index b3e26b9bb5cb..5975138431ef 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -652,7 +652,7 @@ CREATE TABLE {table_name} ( let request = Request::Query(QueryRequest { query: Some(Query::Sql( - "SELECT ts, a, b FROM auto_created_table".to_string(), + "SELECT ts, a, b FROM auto_created_table order by ts".to_string(), )), }); let output = query(instance, request.clone()).await; diff --git a/tests/cases/distributed/explain/analyze.result b/tests/cases/distributed/explain/analyze.result index 9adcf9eb212c..26f55d9a2470 100644 --- a/tests/cases/distributed/explain/analyze.result +++ b/tests/cases/distributed/explain/analyze.result @@ -35,7 +35,7 @@ explain analyze SELECT count(*) FROM system_metrics; |_|_|_CoalescePartitionsExec REDACTED |_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SinglePartitionScanner: REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 1_| +-+-+-+ diff --git a/tests/cases/standalone/common/alter/alter_table_default.result b/tests/cases/standalone/common/alter/alter_table_default.result index 95d0358c8755..40801bd2e7d7 100644 --- a/tests/cases/standalone/common/alter/alter_table_default.result +++ b/tests/cases/standalone/common/alter/alter_table_default.result @@ -25,6 +25,7 @@ INSERT INTO test1 values (3, 3, DEFAULT), (4, 4, '2024-01-31 00:01:01'); Affected Rows: 2 +-- SQLNESS SORT_RESULT 3 1 SELECT i, ts1 FROM test1; +---+---------------------+ @@ -49,6 +50,7 @@ INSERT INTO test1 values (5, 5, DEFAULT, DEFAULT), (6, 6, DEFAULT, '2024-01-31 0 Affected Rows: 2 +-- SQLNESS SORT_RESULT 3 1 SELECT i, ts1, ts2 FROM test1; +---+---------------------+---------------------+ diff --git a/tests/cases/standalone/common/alter/alter_table_default.sql b/tests/cases/standalone/common/alter/alter_table_default.sql index 3169963b7a44..8789b2da571a 100644 --- a/tests/cases/standalone/common/alter/alter_table_default.sql +++ b/tests/cases/standalone/common/alter/alter_table_default.sql @@ -11,6 +11,7 @@ ALTER TABLE test1 ADD COLUMN ts1 TIMESTAMP DEFAULT '2024-01-30 00:01:01' PRIMARY INSERT INTO test1 values (3, 3, DEFAULT), (4, 4, '2024-01-31 00:01:01'); +-- SQLNESS SORT_RESULT 3 1 SELECT i, ts1 FROM test1; SET time_zone = 'Asia/Shanghai'; @@ -20,6 +21,7 @@ ALTER TABLE test1 ADD COLUMN ts2 TIMESTAMP DEFAULT '2024-01-30 00:01:01' PRIMARY INSERT INTO test1 values (5, 5, DEFAULT, DEFAULT), (6, 6, DEFAULT, '2024-01-31 00:01:01'); +-- SQLNESS SORT_RESULT 3 1 SELECT i, ts1, ts2 FROM test1; SET time_zone = 'UTC'; diff --git a/tests/cases/standalone/common/alter/alter_table_first_after.result b/tests/cases/standalone/common/alter/alter_table_first_after.result index ba7e0ade6c4c..ff13ec388d1c 100644 --- a/tests/cases/standalone/common/alter/alter_table_first_after.result +++ b/tests/cases/standalone/common/alter/alter_table_first_after.result @@ -108,6 +108,7 @@ DESC TABLE t; | m | Int32 | | YES | | FIELD | +--------+----------------------+-----+------+---------+---------------+ +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM t; +---+---+-------------------------+---+---+---+ @@ -155,6 +156,7 @@ DESC TABLE t; | m | Int32 | | YES | | FIELD | +--------+----------------------+-----+------+---------+---------------+ +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM t; +---+---+---+-------------------------+---+---+---+---+ diff --git a/tests/cases/standalone/common/alter/alter_table_first_after.sql b/tests/cases/standalone/common/alter/alter_table_first_after.sql index a2106230baca..0713d660bdb5 100644 --- a/tests/cases/standalone/common/alter/alter_table_first_after.sql +++ b/tests/cases/standalone/common/alter/alter_table_first_after.sql @@ -29,6 +29,7 @@ ALTER TABLE t ADD COLUMN y INTEGER AFTER j; DESC TABLE t; +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM t; -- SQLNESS ARG restart=true @@ -40,6 +41,7 @@ ALTER TABLE t ADD COLUMN b INTEGER AFTER j; DESC TABLE t; +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM t; ALTER TABLE t ADD COLUMN x int xxx; diff --git a/tests/cases/standalone/common/alter/change_col_type.result b/tests/cases/standalone/common/alter/change_col_type.result index 3d9500105a6d..f0a641d28bc2 100644 --- a/tests/cases/standalone/common/alter/change_col_type.result +++ b/tests/cases/standalone/common/alter/change_col_type.result @@ -39,6 +39,7 @@ INSERT INTO test VALUES (3, "greptime", 3, true); Affected Rows: 1 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM test; +----+----------+-------------------------+-------+ @@ -64,6 +65,7 @@ ALTER TABLE test MODIFY I INTEGER; Affected Rows: 0 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM test; +----+---+-------------------------+-------+ diff --git a/tests/cases/standalone/common/alter/change_col_type.sql b/tests/cases/standalone/common/alter/change_col_type.sql index 1eb95c719cdc..0fe8c28e9b90 100644 --- a/tests/cases/standalone/common/alter/change_col_type.sql +++ b/tests/cases/standalone/common/alter/change_col_type.sql @@ -16,12 +16,14 @@ SELECT * FROM test; INSERT INTO test VALUES (3, "greptime", 3, true); +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM test; DESCRIBE test; ALTER TABLE test MODIFY I INTEGER; +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM test; DESCRIBE test; diff --git a/tests/cases/standalone/common/alter/drop_col_not_null.result b/tests/cases/standalone/common/alter/drop_col_not_null.result index 86a6f3150eeb..783220c79e45 100644 --- a/tests/cases/standalone/common/alter/drop_col_not_null.result +++ b/tests/cases/standalone/common/alter/drop_col_not_null.result @@ -23,6 +23,7 @@ INSERT INTO test VALUES (3); Affected Rows: 1 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM test; +-------------------------+ diff --git a/tests/cases/standalone/common/alter/drop_col_not_null.sql b/tests/cases/standalone/common/alter/drop_col_not_null.sql index ff98f350d3fe..b7700d830920 100644 --- a/tests/cases/standalone/common/alter/drop_col_not_null.sql +++ b/tests/cases/standalone/common/alter/drop_col_not_null.sql @@ -8,6 +8,7 @@ ALTER TABLE test DROP COLUMN j; INSERT INTO test VALUES (3); +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM test; DROP TABLE test; diff --git a/tests/cases/standalone/common/alter/drop_col_not_null_next.result b/tests/cases/standalone/common/alter/drop_col_not_null_next.result index 049aa8d0df5b..74952f49b35b 100644 --- a/tests/cases/standalone/common/alter/drop_col_not_null_next.result +++ b/tests/cases/standalone/common/alter/drop_col_not_null_next.result @@ -28,6 +28,7 @@ INSERT INTO test VALUES (3, 13); Affected Rows: 1 +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM test; +-------------------------+----+ diff --git a/tests/cases/standalone/common/alter/drop_col_not_null_next.sql b/tests/cases/standalone/common/alter/drop_col_not_null_next.sql index 84850539e74c..bdd8274d9f16 100644 --- a/tests/cases/standalone/common/alter/drop_col_not_null_next.sql +++ b/tests/cases/standalone/common/alter/drop_col_not_null_next.sql @@ -11,6 +11,7 @@ INSERT INTO test VALUES (3, NULL); INSERT INTO test VALUES (3, 13); +-- SQLNESS SORT_RESULT 3 1 SELECT * FROM test; DROP TABLE test; diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index 5ccc155b05fe..59b12671aea8 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -74,7 +74,7 @@ EXPLAIN ANALYZE SELECT ts, host, min(val) RANGE '5s' FROM host ALIGN '5s'; | 0_| 0_|_RangeSelectExec: range_expr=[MIN(host.val) RANGE 5s], align=5000ms, align_to=0ms, align_by=[host@1], time_index=ts REDACTED |_|_|_MergeScanExec: REDACTED |_|_|_| -| 1_| 0_|_SinglePartitionScanner: REDACTED +| 1_| 0_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 10_| +-+-+-+ diff --git a/tests/cases/standalone/common/tql-explain-analyze/analyze.result b/tests/cases/standalone/common/tql-explain-analyze/analyze.result index e3bbc84e4257..8fb7eb2144f0 100644 --- a/tests/cases/standalone/common/tql-explain-analyze/analyze.result +++ b/tests/cases/standalone/common/tql-explain-analyze/analyze.result @@ -30,7 +30,7 @@ TQL ANALYZE (0, 10, '5s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SinglePartitionScanner: REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -59,7 +59,7 @@ TQL ANALYZE (0, 10, '1s', '2s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -2000 AND j@1 <= 12000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SinglePartitionScanner: REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -87,7 +87,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SinglePartitionScanner: REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+ @@ -117,7 +117,7 @@ TQL ANALYZE VERBOSE (0, 10, '5s') test; |_|_|_CoalesceBatchesExec: target_batch_size=8192 REDACTED |_|_|_FilterExec: j@1 >= -300000 AND j@1 <= 310000 REDACTED |_|_|_RepartitionExec: partitioning=REDACTED -|_|_|_SinglePartitionScanner: REDACTED +|_|_|_SeqScan: partition_count=1 (1 memtables, 0 file ranges) REDACTED |_|_|_| |_|_| Total rows: 4_| +-+-+-+