diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index d9a6fae7461e..fbef5bb8ed75 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -22,6 +22,7 @@ pub mod projection; pub(crate) mod prune; pub(crate) mod range; pub(crate) mod scan_region; +pub(crate) mod scan_util; pub(crate) mod seq_scan; pub(crate) mod unordered_scan; @@ -57,7 +58,6 @@ use crate::error::{ use crate::memtable::BoxedBatchIterator; use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::read::prune::PruneReader; -use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics}; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -738,7 +738,7 @@ impl BatchReader for Box { pub(crate) struct ScannerMetrics { /// Duration to prepare the scan task. prepare_scan_cost: Duration, - /// Duration to build parts. + /// Duration to build file ranges. build_parts_cost: Duration, /// Duration to build the (merge) reader. build_reader_cost: Duration, @@ -758,31 +758,17 @@ pub(crate) struct ScannerMetrics { num_mem_ranges: usize, /// Number of file ranges scanned. num_file_ranges: usize, - /// Filter related metrics for readers. - filter_metrics: ReaderFilterMetrics, } impl ScannerMetrics { - /// Sets and observes metrics on initializing parts. - fn observe_init_part(&mut self, build_parts_cost: Duration, reader_metrics: &ReaderMetrics) { - self.build_parts_cost = build_parts_cost; - - // Observes metrics. + /// Observes metrics. + fn observe_metrics(&self) { READ_STAGE_ELAPSED .with_label_values(&["prepare_scan"]) .observe(self.prepare_scan_cost.as_secs_f64()); READ_STAGE_ELAPSED .with_label_values(&["build_parts"]) .observe(self.build_parts_cost.as_secs_f64()); - - // We only call this once so we overwrite it directly. - self.filter_metrics = reader_metrics.filter_metrics; - // Observes filter metrics. - self.filter_metrics.observe(); - } - - /// Observes metrics on scanner finish. - fn observe_metrics_on_finish(&self) { READ_STAGE_ELAPSED .with_label_values(&["build_reader"]) .observe(self.build_reader_cost.as_secs_f64()); @@ -801,6 +787,21 @@ impl ScannerMetrics { READ_ROWS_RETURN.observe(self.num_rows as f64); READ_BATCHES_RETURN.observe(self.num_batches as f64); } + + /// Merges metrics from another [ScannerMetrics]. + fn merge_from(&mut self, other: &ScannerMetrics) { + self.prepare_scan_cost += other.prepare_scan_cost; + self.build_parts_cost += other.build_parts_cost; + self.build_reader_cost += other.build_reader_cost; + self.scan_cost += other.scan_cost; + self.convert_cost += other.convert_cost; + self.yield_cost += other.yield_cost; + self.total_cost += other.total_cost; + self.num_batches += other.num_batches; + self.num_rows += other.num_rows; + self.num_mem_ranges += other.num_mem_ranges; + self.num_file_ranges += other.num_file_ranges; + } } #[cfg(test)] diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 8408aad7b2d8..50cda4aabb83 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -14,7 +14,9 @@ //! Structs for partition ranges. +use common_time::Timestamp; use smallvec::{smallvec, SmallVec}; +use store_api::region_engine::PartitionRange; use crate::memtable::MemtableRef; use crate::read::scan_region::ScanInput; @@ -48,6 +50,26 @@ pub(crate) struct RangeMeta { } impl RangeMeta { + /// Creates a [PartitionRange] with specific identifier. + /// It converts the inclusive max timestamp to exclusive end timestamp. + pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange { + PartitionRange { + start: self.time_range.0, + end: Timestamp::new( + // The i64::MAX timestamp may be invisible but we don't guarantee to support this + // value now. + self.time_range + .1 + .value() + .checked_add(1) + .unwrap_or(self.time_range.1.value()), + self.time_range.1.unit(), + ), + num_rows: self.num_rows, + identifier, + } + } + /// Creates a list of ranges from the `input` for seq scan. pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec { let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len()); @@ -177,7 +199,7 @@ impl RangeMeta { } fn push_seq_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec) { - // For non append-only mode, each range only contains one memtable. + // For non append-only mode, each range only contains one memtable by default. for (i, memtable) in memtables.iter().enumerate() { let stats = memtable.stats(); let Some(time_range) = stats.time_range() else { @@ -195,6 +217,7 @@ impl RangeMeta { } } + // TODO(yingwen): Support multiple row groups in a range so we can split them later. fn push_seq_file_ranges( num_memtables: usize, files: &[FileHandle], @@ -264,3 +287,83 @@ fn maybe_split_ranges_for_seq_scan(ranges: Vec) -> Vec { new_ranges } + +#[cfg(test)] +mod tests { + use common_time::timestamp::TimeUnit; + use common_time::Timestamp; + + use super::*; + + type Output = (Vec, i64, i64); + + fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) { + let ranges = input + .iter() + .map(|(idx, start, end)| { + let time_range = ( + Timestamp::new(*start, TimeUnit::Second), + Timestamp::new(*end, TimeUnit::Second), + ); + RangeMeta { + time_range, + indices: smallvec![*idx], + row_group_indices: smallvec![RowGroupIndex { + index: *idx, + row_group_index: 0 + }], + num_rows: 1, + } + }) + .collect(); + let output = group_ranges_for_seq_scan(ranges); + let actual: Vec<_> = output + .iter() + .map(|range| { + let indices = range.indices.to_vec(); + let group_indices: Vec<_> = range + .row_group_indices + .iter() + .map(|idx| idx.index) + .collect(); + assert_eq!(indices, group_indices); + let range = range.time_range; + (indices, range.0.value(), range.1.value()) + }) + .collect(); + assert_eq!(expect, actual); + } + + #[test] + fn test_group_ranges() { + // Group 1 part. + run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]); + + // 1, 2, 3, 4 => [3, 1, 4], [2] + run_group_ranges_test( + &[ + (1, 1000, 2000), + (2, 6000, 7000), + (3, 0, 1500), + (4, 1500, 3000), + ], + &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)], + ); + + // 1, 2, 3 => [3], [1], [2], + run_group_ranges_test( + &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)], + &[ + (vec![3], 0, 1000), + (vec![1], 3000, 4000), + (vec![2], 4001, 6000), + ], + ); + + // 1, 2, 3 => [3], [1, 2] + run_group_ranges_test( + &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)], + &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)], + ); + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 28deb1186ebd..1c0dd50d0b10 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -17,14 +17,12 @@ use std::collections::{BTreeMap, HashSet}; use std::fmt; use std::sync::{Arc, Mutex as StdMutex}; -use std::time::{Duration, Instant}; +use std::time::Instant; use common_error::ext::BoxedError; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, error, tracing, warn}; use common_time::range::TimestampRange; -use common_time::Timestamp; -use datafusion::physical_plan::DisplayFormatType; use datafusion_expr::utils::expr_to_columns; use parquet::arrow::arrow_reader::RowSelection; use smallvec::SmallVec; @@ -48,7 +46,7 @@ use crate::read::unordered_scan::UnorderedScan; use crate::read::{Batch, Source}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; -use crate::sst::file::{overlaps, FileHandle, FileMeta}; +use crate::sst::file::FileHandle; use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder; use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef; use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder; @@ -700,73 +698,6 @@ impl ScanInput { }) } - /// Prunes file ranges to scan and adds them to the `collector`. - pub(crate) async fn prune_file_ranges( - &self, - collector: &mut impl FileRangeCollector, - ) -> Result { - let mut file_prune_cost = Duration::ZERO; - let mut reader_metrics = ReaderMetrics::default(); - for file in &self.files { - let prune_start = Instant::now(); - let res = self - .access_layer - .read_sst(file.clone()) - .predicate(self.predicate.clone()) - .time_range(self.time_range) - .projection(Some(self.mapper.column_ids().to_vec())) - .cache(self.cache_manager.clone()) - .inverted_index_applier(self.inverted_index_applier.clone()) - .fulltext_index_applier(self.fulltext_index_applier.clone()) - .expected_metadata(Some(self.mapper.metadata().clone())) - .build_reader_input(&mut reader_metrics) - .await; - file_prune_cost += prune_start.elapsed(); - let (mut file_range_ctx, row_groups) = match res { - Ok(x) => x, - Err(e) => { - if e.is_object_not_found() && self.ignore_file_not_found { - error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id()); - continue; - } else { - return Err(e); - } - } - }; - if !compat::has_same_columns( - self.mapper.metadata(), - file_range_ctx.read_format().metadata(), - ) { - // They have different schema. We need to adapt the batch first so the - // mapper can convert it. - let compat = CompatBatch::new( - &self.mapper, - file_range_ctx.read_format().metadata().clone(), - )?; - file_range_ctx.set_compat_batch(Some(compat)); - } - // Build ranges from row groups. - let file_range_ctx = Arc::new(file_range_ctx); - let file_ranges = row_groups - .into_iter() - .map(|(row_group_idx, row_selection)| { - FileRange::new(file_range_ctx.clone(), row_group_idx, row_selection) - }); - collector.append_file_ranges(file.meta_ref(), file_ranges); - } - - READ_SST_COUNT.observe(self.files.len() as f64); - - common_telemetry::debug!( - "Region {} prune {} files, cost is {:?}", - self.mapper.metadata().region_id, - self.files.len(), - file_prune_cost - ); - - Ok(reader_metrics) - } - /// Scans the input source in another task and sends batches to the sender. pub(crate) fn spawn_scan_task( &self, @@ -806,10 +737,7 @@ impl ScanInput { pub(crate) fn predicate(&self) -> Option { self.predicate.clone() } -} -#[cfg(test)] -impl ScanInput { /// Returns number of memtables to scan. pub(crate) fn num_memtables(&self) -> usize { self.memtables.len() @@ -819,166 +747,21 @@ impl ScanInput { pub(crate) fn num_files(&self) -> usize { self.files.len() } +} +#[cfg(test)] +impl ScanInput { /// Returns SST file ids to scan. pub(crate) fn file_ids(&self) -> Vec { self.files.iter().map(|file| file.file_id()).collect() } } -/// Groups of file ranges. Each group in the list contains multiple file -/// ranges to scan. File ranges in the same group may come from different files. -pub(crate) type FileRangesGroup = SmallVec<[Vec; 4]>; - -/// A partition of a scanner to read. -/// It contains memtables and file ranges to scan. -#[derive(Clone, Default)] -pub(crate) struct ScanPart { - /// Memtable ranges to scan. - pub(crate) memtable_ranges: Vec, - /// File ranges to scan. - pub(crate) file_ranges: FileRangesGroup, - /// Optional time range of the part (inclusive). - pub(crate) time_range: Option<(Timestamp, Timestamp)>, -} - -impl fmt::Debug for ScanPart { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!( - f, - "ScanPart({} memtable ranges, {} file ranges", - self.memtable_ranges.len(), - self.file_ranges - .iter() - .map(|ranges| ranges.len()) - .sum::(), - )?; - if let Some(time_range) = &self.time_range { - write!(f, ", time range: {:?})", time_range) - } else { - write!(f, ")") - } - } -} - -impl ScanPart { - /// Returns true if the time range given `part` overlaps with this part. - pub(crate) fn overlaps(&self, part: &ScanPart) -> bool { - let (Some(current_range), Some(part_range)) = (self.time_range, part.time_range) else { - return true; - }; - - overlaps(¤t_range, &part_range) - } - - /// Merges given `part` to this part. - pub(crate) fn merge(&mut self, mut part: ScanPart) { - self.memtable_ranges.append(&mut part.memtable_ranges); - self.file_ranges.append(&mut part.file_ranges); - let Some(part_range) = part.time_range else { - return; - }; - let Some(current_range) = self.time_range else { - self.time_range = part.time_range; - return; - }; - let start = current_range.0.min(part_range.0); - let end = current_range.1.max(part_range.1); - self.time_range = Some((start, end)); - } - - /// Returns true if the we can split the part into multiple parts - /// and preserving order. - pub(crate) fn can_split_preserve_order(&self) -> bool { - self.memtable_ranges.is_empty() - && self.file_ranges.len() == 1 - && self.file_ranges[0].len() > 1 - } -} - -/// A trait to collect file ranges to scan. -pub(crate) trait FileRangeCollector { - /// Appends file ranges from the **same file** to the collector. - fn append_file_ranges( - &mut self, - file_meta: &FileMeta, - file_ranges: impl Iterator, - ); -} - -/// Optional list of [ScanPart]s. -#[derive(Default)] -pub(crate) struct ScanPartList(pub(crate) Option>); - -impl fmt::Debug for ScanPartList { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self.0 { - Some(parts) => write!(f, "{:?}", parts), - None => write!(f, "[]"), - } - } -} - -impl ScanPartList { - /// Returns true if the list is None. - pub(crate) fn is_none(&self) -> bool { - self.0.is_none() - } - - /// Sets parts to the list. - pub(crate) fn set_parts(&mut self, parts: Vec) { - self.0 = Some(parts); - } - - /// Gets the part by index, returns None if the index is out of bound. - /// # Panics - /// Panics if parts are not initialized. - pub(crate) fn get_part(&mut self, index: usize) -> Option<&ScanPart> { - let parts = self.0.as_ref().unwrap(); - parts.get(index) - } - - /// Returns the number of parts. - pub(crate) fn len(&self) -> usize { - self.0.as_ref().map_or(0, |parts| parts.len()) - } - - /// Returns the number of memtable ranges. - pub(crate) fn num_mem_ranges(&self) -> usize { - self.0.as_ref().map_or(0, |parts| { - parts.iter().map(|part| part.memtable_ranges.len()).sum() - }) - } - - /// Returns the number of files. - pub(crate) fn num_files(&self) -> usize { - self.0.as_ref().map_or(0, |parts| { - parts.iter().map(|part| part.file_ranges.len()).sum() - }) - } - - /// Returns the number of file ranges. - pub(crate) fn num_file_ranges(&self) -> usize { - self.0.as_ref().map_or(0, |parts| { - parts - .iter() - .flat_map(|part| part.file_ranges.iter()) - .map(|ranges| ranges.len()) - .sum() - }) - } -} - /// Context shared by different streams from a scanner. -/// It contains the input and distributes input to multiple parts -/// to scan. +/// It contains the input and ranges to scan. pub(crate) struct StreamContext { /// Input memtables and files. pub(crate) input: ScanInput, - /// Parts to scan and the cost to build parts. - /// The scanner builds parts to scan from the input lazily. - /// The mutex is used to ensure the parts are only built once. - pub(crate) parts: Mutex<(ScanPartList, Duration)>, /// Metadata for partition ranges. pub(crate) ranges: Vec, /// Lists of range builders. @@ -994,12 +777,11 @@ impl StreamContext { pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::seq_scan_ranges(&input); - READ_SST_COUNT.observe(input.files.len() as f64); - let range_builders = RangeBuilderList::new(input.memtables.len(), input.files.len()); + READ_SST_COUNT.observe(input.num_files() as f64); + let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files()); Self { input, - parts: Mutex::new((ScanPartList::default(), Duration::default())), ranges, range_builders, query_start, @@ -1010,12 +792,11 @@ impl StreamContext { pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self { let query_start = input.query_start.unwrap_or_else(Instant::now); let ranges = RangeMeta::unordered_scan_ranges(&input); - READ_SST_COUNT.observe(input.files.len() as f64); - let range_builders = RangeBuilderList::new(input.memtables.len(), input.files.len()); + READ_SST_COUNT.observe(input.num_files() as f64); + let range_builders = RangeBuilderList::new(input.num_memtables(), input.num_files()); Self { input, - parts: Mutex::new((ScanPartList::default(), Duration::default())), ranges, range_builders, query_start, @@ -1024,27 +805,28 @@ impl StreamContext { /// Returns true if the index refers to a memtable. pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool { - self.input.memtables.len() > index.index + self.input.num_memtables() > index.index } /// Creates file ranges to scan. pub(crate) async fn build_file_ranges( &self, index: RowGroupIndex, - ranges: &mut Vec, reader_metrics: &mut ReaderMetrics, - ) -> Result<()> { - ranges.clear(); + ) -> Result> { + let mut ranges = SmallVec::new(); self.range_builders - .build_file_ranges(&self.input, index, ranges, reader_metrics) - .await + .build_file_ranges(&self.input, index, &mut ranges, reader_metrics) + .await?; + Ok(ranges) } /// Creates memtable ranges to scan. - pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex, ranges: &mut Vec) { - ranges.clear(); + pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> { + let mut ranges = SmallVec::new(); self.range_builders - .build_mem_ranges(&self.input, index, ranges) + .build_mem_ranges(&self.input, index, &mut ranges); + ranges } /// Retrieves the partition ranges. @@ -1052,35 +834,30 @@ impl StreamContext { self.ranges .iter() .enumerate() - .map(|(idx, range_meta)| PartitionRange { - start: range_meta.time_range.0, - end: range_meta.time_range.1, - num_rows: range_meta.num_rows, - identifier: idx, - }) + .map(|(idx, range_meta)| range_meta.new_partition_range(idx)) .collect() } /// Format the context for explain. - pub(crate) fn format_for_explain( - &self, - t: DisplayFormatType, - f: &mut fmt::Formatter, - ) -> fmt::Result { - match self.parts.try_lock() { - Ok(inner) => match t { - DisplayFormatType::Default => write!( - f, - "partition_count={} ({} memtable ranges, {} file {} ranges)", - inner.0.len(), - inner.0.num_mem_ranges(), - inner.0.num_files(), - inner.0.num_file_ranges() - )?, - DisplayFormatType::Verbose => write!(f, "{:?}", inner.0)?, - }, - Err(_) => write!(f, "")?, + pub(crate) fn format_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + let (mut num_mem_ranges, mut num_file_ranges) = (0, 0); + for range_meta in &self.ranges { + for idx in &range_meta.row_group_indices { + if self.is_mem_range_index(*idx) { + num_mem_ranges += 1; + } else { + num_file_ranges += 1; + } + } } + write!( + f, + "partition_count={} ({} memtable ranges, {} file {} ranges)", + self.ranges.len(), + num_mem_ranges, + self.input.num_files(), + num_file_ranges, + )?; if let Some(selector) = &self.input.series_row_selector { write!(f, ", selector={}", selector)?; } @@ -1110,7 +887,7 @@ impl RangeBuilderList { &self, input: &ScanInput, index: RowGroupIndex, - ranges: &mut Vec, + ranges: &mut SmallVec<[FileRange; 2]>, reader_metrics: &mut ReaderMetrics, ) -> Result<()> { let file_index = index.index - self.mem_builders.len(); @@ -1131,7 +908,7 @@ impl RangeBuilderList { &self, input: &ScanInput, index: RowGroupIndex, - ranges: &mut Vec, + ranges: &mut SmallVec<[MemtableRange; 2]>, ) { let mut builder_opt = self.mem_builders[index.index].lock().unwrap(); match &mut *builder_opt { @@ -1159,7 +936,7 @@ struct FileRangeBuilder { impl FileRangeBuilder { /// Builds file ranges to read. /// Negative `row_group_index` indicates all row groups. - fn build_ranges(&self, row_group_index: i64, ranges: &mut Vec) { + fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) { let Some(context) = self.context.clone() else { return; }; @@ -1196,7 +973,7 @@ struct MemRangeBuilder { impl MemRangeBuilder { /// Builds mem ranges to read in the memtable. /// Negative `row_group_index` indicates all row groups. - fn build_ranges(&self, row_group_index: i64, ranges: &mut Vec) { + fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) { if row_group_index >= 0 { let row_group_index = row_group_index as usize; // Scans one row group. diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs new file mode 100644 index 000000000000..74e7af42d627 --- /dev/null +++ b/src/mito2/src/read/scan_util.rs @@ -0,0 +1,182 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities for scanners. + +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use async_stream::try_stream; +use common_telemetry::debug; +use futures::Stream; +use store_api::storage::RegionId; + +use crate::error::Result; +use crate::read::range::RowGroupIndex; +use crate::read::scan_region::StreamContext; +use crate::read::{Batch, ScannerMetrics, Source}; +use crate::sst::parquet::reader::ReaderMetrics; + +struct PartitionMetricsInner { + region_id: RegionId, + /// Index of the partition to scan. + partition: usize, + /// Label to distinguish different scan operation. + scanner_type: &'static str, + /// Query start time. + query_start: Instant, + /// Elapsed time before the first poll operation. + first_poll: Duration, + metrics: ScannerMetrics, + reader_metrics: ReaderMetrics, +} + +impl PartitionMetricsInner { + fn on_finish(&mut self) { + if self.metrics.total_cost.is_zero() { + self.metrics.total_cost = self.query_start.elapsed(); + } + self.metrics.build_parts_cost = self.reader_metrics.build_cost; + } +} + +impl Drop for PartitionMetricsInner { + fn drop(&mut self) { + self.on_finish(); + self.metrics.observe_metrics(); + + debug!( + "{} finished, region_id: {}, partition: {}, first_poll: {:?}, metrics: {:?}, reader_metrics: {:?}", + self.scanner_type, self.region_id, self.partition, self.first_poll, self.metrics, self.reader_metrics + ); + } +} + +/// Metrics while reading a partition. +#[derive(Clone)] +pub(crate) struct PartitionMetrics(Arc>); + +impl PartitionMetrics { + pub(crate) fn new( + region_id: RegionId, + partition: usize, + scanner_type: &'static str, + query_start: Instant, + metrics: ScannerMetrics, + ) -> Self { + let inner = PartitionMetricsInner { + region_id, + partition, + scanner_type, + query_start, + first_poll: Duration::default(), + metrics, + reader_metrics: ReaderMetrics::default(), + }; + Self(Arc::new(Mutex::new(inner))) + } + + pub(crate) fn on_first_poll(&self) { + let mut inner = self.0.lock().unwrap(); + inner.first_poll = inner.query_start.elapsed(); + } + + pub(crate) fn inc_num_mem_ranges(&self, num: usize) { + let mut inner = self.0.lock().unwrap(); + inner.metrics.num_mem_ranges += num; + } + + pub(crate) fn inc_num_file_ranges(&self, num: usize) { + let mut inner = self.0.lock().unwrap(); + inner.metrics.num_file_ranges += num; + } + + pub(crate) fn inc_build_reader_cost(&self, cost: Duration) { + let mut inner = self.0.lock().unwrap(); + inner.metrics.build_reader_cost += cost; + } + + pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) { + let mut inner = self.0.lock().unwrap(); + inner.metrics.merge_from(metrics); + } + + pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) { + let mut inner = self.0.lock().unwrap(); + inner.reader_metrics.merge_from(metrics); + } + + pub(crate) fn on_finish(&self) { + let mut inner = self.0.lock().unwrap(); + inner.on_finish(); + } +} + +/// Scans memtable ranges at `index`. +pub(crate) fn scan_mem_ranges( + stream_ctx: Arc, + part_metrics: PartitionMetrics, + index: RowGroupIndex, +) -> impl Stream> { + try_stream! { + let ranges = stream_ctx.build_mem_ranges(index); + part_metrics.inc_num_mem_ranges(ranges.len()); + for range in ranges { + let build_reader_start = Instant::now(); + let iter = range.build_iter()?; + part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); + + let mut source = Source::Iter(iter); + while let Some(batch) = source.next_batch().await? { + yield batch; + } + } + } +} + +/// Scans file ranges at `index`. +pub(crate) fn scan_file_ranges( + stream_ctx: Arc, + part_metrics: PartitionMetrics, + index: RowGroupIndex, + read_type: &'static str, +) -> impl Stream> { + try_stream! { + let mut reader_metrics = ReaderMetrics::default(); + let ranges = stream_ctx + .build_file_ranges(index, &mut reader_metrics) + .await?; + part_metrics.inc_num_file_ranges(ranges.len()); + for range in ranges { + let build_reader_start = Instant::now(); + let reader = range.reader(None).await?; + part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); + let compat_batch = range.compat_batch(); + let mut source = Source::PruneReader(reader); + while let Some(mut batch) = source.next_batch().await? { + if let Some(compact_batch) = compat_batch { + batch = compact_batch.compat_batch(batch)?; + } + yield batch; + } + if let Source::PruneReader(mut reader) = source { + reader_metrics.merge_from(reader.metrics()); + } + } + + // Reports metrics. + reader_metrics.observe_rows(read_type); + part_metrics.merge_reader_metrics(&reader_metrics); + } +} diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 6e45c8ce4a80..fe4054632e97 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -16,36 +16,29 @@ use std::fmt; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Instant; use async_stream::try_stream; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::util::ChainedRecordBatchStream; use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; -use common_telemetry::{debug, tracing}; +use common_telemetry::tracing; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; -use smallvec::smallvec; use snafu::ResultExt; use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; -use store_api::storage::{ColumnId, TimeSeriesRowSelector}; -use table::predicate::Predicate; +use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; use crate::error::{PartitionOutOfRangeSnafu, Result}; -use crate::memtable::MemtableRef; use crate::read::dedup::{DedupReader, LastNonNull, LastRow}; use crate::read::last_row::LastRowReader; use crate::read::merge::MergeReaderBuilder; -use crate::read::scan_region::{ - FileRangeCollector, ScanInput, ScanPart, ScanPartList, StreamContext, -}; +use crate::read::scan_region::{ScanInput, StreamContext}; +use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source}; use crate::region::options::MergeMode; -use crate::sst::file::FileMeta; -use crate::sst::parquet::file_range::FileRange; -use crate::sst::parquet::reader::ReaderMetrics; /// Scans a region and returns rows in a sorted sequence. /// @@ -66,6 +59,8 @@ pub struct SeqScan { impl SeqScan { /// Creates a new [SeqScan]. pub(crate) fn new(input: ScanInput) -> Self { + // TODO(yingwen): Set permits according to partition num. But we need to support file + // level parallelism. let parallelism = input.parallelism.parallelism.max(1); let mut properties = ScannerProperties::default() .with_append_mode(input.append_mode) @@ -102,150 +97,49 @@ impl SeqScan { /// Builds a [BoxedBatchReader] from sequential scan for compaction. pub async fn build_reader(&self) -> Result { - let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }; - let maybe_reader = Self::build_all_merge_reader( + let part_metrics = PartitionMetrics::new( + self.stream_ctx.input.mapper.metadata().region_id, + 0, + get_scanner_type(self.compaction), + self.stream_ctx.query_start, + ScannerMetrics { + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), + ..Default::default() + }, + ); + debug_assert_eq!(1, self.properties.partitions.len()); + let partition_ranges = &self.properties.partitions[0]; + + let reader = Self::build_all_merge_reader( &self.stream_ctx, + partition_ranges, self.semaphore.clone(), - &mut metrics, self.compaction, - self.properties.num_partitions(), + &part_metrics, ) .await?; - // Safety: `build_merge_reader()` always returns a reader if partition is None. - let reader = maybe_reader.unwrap(); Ok(Box::new(reader)) } - /// Builds sources from a [ScanPart]. - fn build_part_sources( - part: &ScanPart, - sources: &mut Vec, - row_selector: Option, - compaction: bool, - ) -> Result<()> { - sources.reserve(part.memtable_ranges.len() + part.file_ranges.len()); - // Read memtables. - for mem in &part.memtable_ranges { - let iter = mem.build_iter()?; - sources.push(Source::Iter(iter)); - } - let read_type = if compaction { - "compaction" - } else { - "seq_scan_files" - }; - // Read files. - for file in &part.file_ranges { - if file.is_empty() { - continue; - } - - // Creates a stream to read the file. - let ranges = file.clone(); - let stream = try_stream! { - let mut reader_metrics = ReaderMetrics::default(); - // Safety: We checked whether it is empty before. - let file_id = ranges[0].file_handle().file_id(); - let region_id = ranges[0].file_handle().region_id(); - let range_num = ranges.len(); - for range in ranges { - let mut reader = range.reader(row_selector).await?; - let compat_batch = range.compat_batch(); - while let Some(mut batch) = reader.next_batch().await? { - if let Some(compat) = compat_batch { - batch = compat - .compat_batch(batch)?; - } - - yield batch; - } - reader_metrics.merge_from(reader.metrics()); - } - debug!( - "Seq scan region {}, file {}, {} ranges finished, metrics: {:?}, compaction: {}", - region_id, file_id, range_num, reader_metrics, compaction - ); - // Reports metrics. - reader_metrics.observe_rows(read_type); - }; - let stream = Box::pin(stream); - sources.push(Source::Stream(stream)); - } - - Ok(()) - } - /// Builds a merge reader that reads all data. async fn build_all_merge_reader( - stream_ctx: &StreamContext, + stream_ctx: &Arc, + partition_ranges: &[PartitionRange], semaphore: Arc, - metrics: &mut ScannerMetrics, compaction: bool, - parallelism: usize, - ) -> Result> { - // initialize parts list - let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?; - let parts_len = parts.0.len(); - - let mut sources = Vec::with_capacity(parts_len); - for id in 0..parts_len { - let Some(part) = parts.0.get_part(id) else { - return Ok(None); - }; - - Self::build_part_sources(part, &mut sources, None, compaction)?; - } - - Self::build_reader_from_sources(stream_ctx, sources, semaphore).await - } - - /// Builds a merge reader that reads data from one [`PartitionRange`]. - /// - /// If the `range_id` is out of bound, returns None. - async fn build_merge_reader( - stream_ctx: &StreamContext, - range_id: usize, - semaphore: Arc, - metrics: &mut ScannerMetrics, - compaction: bool, - parallelism: usize, - ) -> Result> { + part_metrics: &PartitionMetrics, + ) -> Result { let mut sources = Vec::new(); - let build_start = { - let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, metrics, parallelism).await?; - - let Some(part) = parts.0.get_part(range_id) else { - return Ok(None); - }; - - let build_start = Instant::now(); - Self::build_part_sources( - part, - &mut sources, - stream_ctx.input.series_row_selector, + for part_range in partition_ranges { + build_sources( + stream_ctx, + part_range, compaction, - )?; - - build_start - }; - - let maybe_reader = Self::build_reader_from_sources(stream_ctx, sources, semaphore).await; - let build_reader_cost = build_start.elapsed(); - metrics.build_reader_cost += build_reader_cost; - debug!( - "Build reader region: {}, range_id: {}, from sources, build_reader_cost: {:?}, compaction: {}", - stream_ctx.input.mapper.metadata().region_id, - range_id, - build_reader_cost, - compaction, - ); - - maybe_reader + part_metrics, + &mut sources, + ); + } + Self::build_reader_from_sources(stream_ctx, sources, semaphore).await } #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)] @@ -253,7 +147,7 @@ impl SeqScan { stream_ctx: &StreamContext, mut sources: Vec, semaphore: Arc, - ) -> Result> { + ) -> Result { if stream_ctx.input.parallelism.parallelism > 1 { // Read sources in parallel. We always spawn a task so we can control the parallelism // by the semaphore. @@ -286,13 +180,11 @@ impl SeqScan { None => reader, }; - Ok(Some(reader)) + Ok(reader) } /// Scans the given partition when the part list is set properly. /// Otherwise the returned stream might not contains any data. - // TODO: refactor out `uncached_scan_part_impl`. - #[allow(dead_code)] fn scan_partition_impl( &self, partition: usize, @@ -307,28 +199,36 @@ impl SeqScan { )); } - let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }; let stream_ctx = self.stream_ctx.clone(); let semaphore = self.semaphore.clone(); let partition_ranges = self.properties.partitions[partition].clone(); let compaction = self.compaction; - let parallelism = self.properties.num_partitions(); + let part_metrics = PartitionMetrics::new( + self.stream_ctx.input.mapper.metadata().region_id, + partition, + get_scanner_type(self.compaction), + stream_ctx.query_start, + ScannerMetrics { + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), + ..Default::default() + }, + ); + let stream = try_stream! { - let first_poll = stream_ctx.query_start.elapsed(); + part_metrics.on_first_poll(); - for partition_range in partition_ranges { - let maybe_reader = - Self::build_merge_reader(&stream_ctx, partition_range.identifier, semaphore.clone(), &mut metrics, compaction, parallelism) + // Scans each part. + for part_range in partition_ranges { + let mut sources = Vec::new(); + build_sources(&stream_ctx, &part_range, compaction, &part_metrics, &mut sources); + + let mut reader = + Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone()) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; - let Some(mut reader) = maybe_reader else { - return; - }; let cache = stream_ctx.input.cache_manager.as_deref(); + let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); while let Some(batch) = reader .next_batch() @@ -350,117 +250,10 @@ impl SeqScan { fetch_start = Instant::now(); } metrics.scan_cost += fetch_start.elapsed(); - metrics.total_cost = stream_ctx.query_start.elapsed(); - metrics.observe_metrics_on_finish(); - - debug!( - "Seq scan finished, region_id: {:?}, partition: {}, metrics: {:?}, first_poll: {:?}, compaction: {}", - stream_ctx.input.mapper.metadata().region_id, - partition, - metrics, - first_poll, - compaction, - ); + part_metrics.merge_metrics(&metrics); } - }; - - let stream = Box::pin(RecordBatchStreamWrapper::new( - self.stream_ctx.input.mapper.output_schema(), - Box::pin(stream), - )); - - Ok(stream) - } - - /// Scans the given partition when the part list is not set. - /// This method will do a lazy initialize of part list and - /// ignores the partition settings in `properties`. - fn uncached_scan_part_impl( - &self, - partition: usize, - ) -> Result { - let num_partitions = self.properties.partitions.len(); - if partition >= num_partitions { - return Err(BoxedError::new( - PartitionOutOfRangeSnafu { - given: partition, - all: self.properties.partitions.len(), - } - .build(), - )); - } - let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }; - let stream_ctx = self.stream_ctx.clone(); - let semaphore = self.semaphore.clone(); - let compaction = self.compaction; - let parallelism = self.properties.num_partitions(); - - // build stream - let stream = try_stream! { - let first_poll = stream_ctx.query_start.elapsed(); - - // init parts - let parts_len = { - let mut parts = stream_ctx.parts.lock().await; - Self::maybe_init_parts(&stream_ctx.input, &mut parts, &mut metrics, parallelism).await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - parts.0.len() - }; - - for id in (0..parts_len).skip(partition).step_by(num_partitions) { - let maybe_reader = Self::build_merge_reader( - &stream_ctx, - id, - semaphore.clone(), - &mut metrics, - compaction, - parallelism - ) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - let Some(mut reader) = maybe_reader else { - return; - }; - let cache = stream_ctx.input.cache_manager.as_deref(); - let mut fetch_start = Instant::now(); - while let Some(batch) = reader - .next_batch() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? - { - metrics.scan_cost += fetch_start.elapsed(); - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - let convert_start = Instant::now(); - let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; - metrics.convert_cost += convert_start.elapsed(); - let yield_start = Instant::now(); - yield record_batch; - metrics.yield_cost += yield_start.elapsed(); - - fetch_start = Instant::now(); - } - metrics.scan_cost += fetch_start.elapsed(); - metrics.total_cost = stream_ctx.query_start.elapsed(); - metrics.observe_metrics_on_finish(); - - debug!( - "Seq scan finished, region_id: {}, partition: {}, id: {}, metrics: {:?}, first_poll: {:?}, compaction: {}", - stream_ctx.input.mapper.metadata().region_id, - partition, - id, - metrics, - first_poll, - compaction, - ); - } + part_metrics.on_finish(); }; let stream = Box::pin(RecordBatchStreamWrapper::new( @@ -470,34 +263,6 @@ impl SeqScan { Ok(stream) } - - /// Initializes parts if they are not built yet. - async fn maybe_init_parts( - input: &ScanInput, - part_list: &mut (ScanPartList, Duration), - metrics: &mut ScannerMetrics, - parallelism: usize, - ) -> Result<()> { - if part_list.0.is_none() { - let now = Instant::now(); - let mut distributor = SeqDistributor::default(); - let reader_metrics = input.prune_file_ranges(&mut distributor).await?; - distributor.append_mem_ranges( - &input.memtables, - Some(input.mapper.column_ids()), - input.predicate.clone(), - ); - part_list.0.set_parts(distributor.build_parts(parallelism)); - let build_part_cost = now.elapsed(); - part_list.1 = build_part_cost; - - metrics.observe_init_part(build_part_cost, &reader_metrics); - } else { - // Updates the cost of building parts. - metrics.build_parts_cost = part_list.1; - } - Ok(()) - } } impl RegionScanner for SeqScan { @@ -510,7 +275,7 @@ impl RegionScanner for SeqScan { } fn scan_partition(&self, partition: usize) -> Result { - self.uncached_scan_part_impl(partition) + self.scan_partition_impl(partition) } fn prepare(&mut self, ranges: Vec>) -> Result<(), BoxedError> { @@ -525,292 +290,66 @@ impl RegionScanner for SeqScan { } impl DisplayAs for SeqScan { - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "SeqScan: region={}, ", self.stream_ctx.input.mapper.metadata().region_id )?; - self.stream_ctx.format_for_explain(t, f) + self.stream_ctx.format_for_explain(f) } } impl fmt::Debug for SeqScan { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("SeqScan") - .field("parts", &self.stream_ctx.parts) + .field("num_ranges", &self.stream_ctx.ranges.len()) .finish() } } -#[cfg(test)] -impl SeqScan { - /// Returns the input. - pub(crate) fn input(&self) -> &ScanInput { - &self.stream_ctx.input - } -} - -/// Builds [ScanPart]s that preserves order. -#[derive(Default)] -pub(crate) struct SeqDistributor { - parts: Vec, -} - -impl FileRangeCollector for SeqDistributor { - fn append_file_ranges( - &mut self, - file_meta: &FileMeta, - file_ranges: impl Iterator, - ) { - // Creates a [ScanPart] for each file. - let ranges: Vec<_> = file_ranges.collect(); - if ranges.is_empty() { - // No ranges to read. - return; - } - let part = ScanPart { - memtable_ranges: Vec::new(), - file_ranges: smallvec![ranges], - time_range: Some(file_meta.time_range), - }; - self.parts.push(part); - } -} - -impl SeqDistributor { - /// Appends memtable ranges to the distributor. - fn append_mem_ranges( - &mut self, - memtables: &[MemtableRef], - projection: Option<&[ColumnId]>, - predicate: Option, - ) { - for mem in memtables { - let stats = mem.stats(); - let mem_ranges = mem.ranges(projection, predicate.clone()); - if mem_ranges.is_empty() { - continue; - } - let part = ScanPart { - memtable_ranges: mem_ranges.into_values().collect(), - file_ranges: smallvec![], - time_range: stats.time_range(), - }; - self.parts.push(part); - } - } - - /// Groups file ranges and memtable ranges by time ranges. - /// The output number of parts may be `<= parallelism`. If `parallelism` is 0, it will be set to 1. - /// - /// Output parts have non-overlapping time ranges. - fn build_parts(self, parallelism: usize) -> Vec { - let parallelism = parallelism.max(1); - let parts = group_parts_by_range(self.parts); - let parts = maybe_split_parts(parts, parallelism); - // Ensures it doesn't returns parts more than `parallelism`. - maybe_merge_parts(parts, parallelism) - } -} - -/// Groups parts by time range. It may generate parts more than parallelism. -/// All time ranges are not None. -fn group_parts_by_range(mut parts: Vec) -> Vec { - if parts.is_empty() { - return Vec::new(); - } - - // Sorts parts by time range. - parts.sort_unstable_by(|a, b| { - // Safety: time ranges of parts from [SeqPartBuilder] are not None. - let a = a.time_range.unwrap(); - let b = b.time_range.unwrap(); - a.0.cmp(&b.0).then_with(|| b.1.cmp(&a.1)) - }); - let mut part_in_range = None; - // Parts with exclusive time ranges. - let mut part_groups = Vec::new(); - for part in parts { - let Some(mut prev_part) = part_in_range.take() else { - part_in_range = Some(part); - continue; - }; - - if prev_part.overlaps(&part) { - prev_part.merge(part); - part_in_range = Some(prev_part); +/// Builds sources for the partition range. +fn build_sources( + stream_ctx: &Arc, + part_range: &PartitionRange, + compaction: bool, + part_metrics: &PartitionMetrics, + sources: &mut Vec, +) { + // Gets range meta. + let range_meta = &stream_ctx.ranges[part_range.identifier]; + sources.reserve(range_meta.row_group_indices.len()); + for index in &range_meta.row_group_indices { + let stream = if stream_ctx.is_mem_range_index(*index) { + let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index); + Box::pin(stream) as _ } else { - // A new group. - part_groups.push(prev_part); - part_in_range = Some(part); - } - } - if let Some(part) = part_in_range { - part_groups.push(part); - } - - part_groups -} - -/// Merges parts by parallelism. -/// It merges parts if the number of parts is greater than `parallelism`. -fn maybe_merge_parts(mut parts: Vec, parallelism: usize) -> Vec { - assert!(parallelism > 0); - if parts.len() <= parallelism { - // No need to merge parts. - return parts; - } - - // Sort parts by number of memtables and ranges in reverse order. - parts.sort_unstable_by(|a, b| { - a.memtable_ranges - .len() - .cmp(&b.memtable_ranges.len()) - .then_with(|| { - let a_ranges_len = a - .file_ranges - .iter() - .map(|ranges| ranges.len()) - .sum::(); - let b_ranges_len = b - .file_ranges - .iter() - .map(|ranges| ranges.len()) - .sum::(); - a_ranges_len.cmp(&b_ranges_len) - }) - .reverse() - }); - - let parts_to_reduce = parts.len() - parallelism; - for _ in 0..parts_to_reduce { - // Safety: We ensure `parts.len() > parallelism`. - let part = parts.pop().unwrap(); - parts.last_mut().unwrap().merge(part); - } - - parts -} - -/// Splits parts by parallelism. -/// It splits a part if it only scans one file and doesn't scan any memtable. -fn maybe_split_parts(mut parts: Vec, parallelism: usize) -> Vec { - assert!(parallelism > 0); - if parts.len() >= parallelism { - // No need to split parts. - return parts; - } - - let has_part_to_split = parts.iter().any(|part| part.can_split_preserve_order()); - if !has_part_to_split { - // No proper parts to scan. - return parts; - } - - // Sorts parts by the number of ranges in the first file. - parts.sort_unstable_by(|a, b| { - let a_len = a.file_ranges.first().map(|file| file.len()).unwrap_or(0); - let b_len = b.file_ranges.first().map(|file| file.len()).unwrap_or(0); - a_len.cmp(&b_len).reverse() - }); - let num_parts_to_split = parallelism - parts.len(); - let mut output_parts = Vec::with_capacity(parallelism); - // Split parts up to num_parts_to_split. - for part in parts.iter_mut() { - if !part.can_split_preserve_order() { - continue; - } - // Safety: `can_split_preserve_order()` ensures file_ranges.len() == 1. - // Splits part into `num_parts_to_split + 1` new parts if possible. - let target_part_num = num_parts_to_split + 1; - let ranges_per_part = (part.file_ranges[0].len() + target_part_num - 1) / target_part_num; - // `can_split_preserve_order()` ensures part.file_ranges[0].len() > 1. - assert!(ranges_per_part > 0); - for ranges in part.file_ranges[0].chunks(ranges_per_part) { - let new_part = ScanPart { - memtable_ranges: Vec::new(), - file_ranges: smallvec![ranges.to_vec()], - time_range: part.time_range, + let read_type = if compaction { + "compaction" + } else { + "seq_scan_files" }; - output_parts.push(new_part); - } - // Replace the current part with the last output part as we will put the current part - // into the output parts later. - *part = output_parts.pop().unwrap(); - if output_parts.len() >= num_parts_to_split { - // We already split enough parts. - break; - } + let stream = + scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type); + Box::pin(stream) as _ + }; + sources.push(Source::Stream(stream)); } - // Put the remaining parts into the output parts. - output_parts.append(&mut parts); - - output_parts } #[cfg(test)] -mod tests { - use common_time::timestamp::TimeUnit; - use common_time::Timestamp; - - use super::*; - use crate::memtable::MemtableId; - use crate::test_util::memtable_util::mem_range_for_test; - - type Output = (Vec, i64, i64); - - fn run_group_parts_test(input: &[(MemtableId, i64, i64)], expect: &[Output]) { - let parts = input - .iter() - .map(|(id, start, end)| { - let range = ( - Timestamp::new(*start, TimeUnit::Second), - Timestamp::new(*end, TimeUnit::Second), - ); - ScanPart { - memtable_ranges: vec![mem_range_for_test(*id)], - file_ranges: smallvec![], - time_range: Some(range), - } - }) - .collect(); - let output = group_parts_by_range(parts); - let actual: Vec<_> = output - .iter() - .map(|part| { - let ids: Vec<_> = part.memtable_ranges.iter().map(|mem| mem.id()).collect(); - let range = part.time_range.unwrap(); - (ids, range.0.value(), range.1.value()) - }) - .collect(); - assert_eq!(expect, actual); +impl SeqScan { + /// Returns the input. + pub(crate) fn input(&self) -> &ScanInput { + &self.stream_ctx.input } +} - #[test] - fn test_group_parts() { - // Group 1 part. - run_group_parts_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]); - - // 1, 2, 3, 4 => [3, 1, 4], [2] - run_group_parts_test( - &[ - (1, 1000, 2000), - (2, 6000, 7000), - (3, 0, 1500), - (4, 1500, 3000), - ], - &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)], - ); - - // 1, 2, 3 => [3], [1], [2], - run_group_parts_test( - &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)], - &[ - (vec![3], 0, 1000), - (vec![1], 3000, 4000), - (vec![2], 4001, 6000), - ], - ); +/// Returns the scanner type. +fn get_scanner_type(compaction: bool) -> &'static str { + if compaction { + "SeqScan(compaction)" + } else { + "SeqScan" } } diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 8a019132d784..992cba9d5c8c 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -21,24 +21,17 @@ use std::time::Instant; use async_stream::{stream, try_stream}; use common_error::ext::BoxedError; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; -use common_telemetry::debug; +use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use datatypes::schema::SchemaRef; use futures::{Stream, StreamExt}; use snafu::ResultExt; use store_api::region_engine::{PartitionRange, RegionScanner, ScannerProperties}; -use crate::cache::CacheManager; -use crate::error::Result; -use crate::memtable::MemtableRange; -use crate::read::compat::CompatBatch; -use crate::read::projection::ProjectionMapper; -use crate::read::range::RowGroupIndex; +use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::scan_region::{ScanInput, StreamContext}; -use crate::read::{ScannerMetrics, Source}; -use crate::sst::parquet::file_range::FileRange; -use crate::sst::parquet::reader::ReaderMetrics; +use crate::read::scan_util::{scan_file_ranges, scan_mem_ranges, PartitionMetrics}; +use crate::read::{Batch, ScannerMetrics}; /// Scans a region without providing any output ordering guarantee. /// @@ -85,62 +78,23 @@ impl UnorderedScan { Ok(stream) } - /// Fetch a batch from the source and convert it into a record batch. - async fn fetch_from_source( - source: &mut Source, - mapper: &ProjectionMapper, - cache: Option<&CacheManager>, - compat_batch: Option<&CompatBatch>, - metrics: &mut ScannerMetrics, - ) -> common_recordbatch::error::Result> { - let start = Instant::now(); - - let Some(mut batch) = source - .next_batch() - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)? - else { - metrics.scan_cost += start.elapsed(); - - return Ok(None); - }; - - if let Some(compat) = compat_batch { - batch = compat - .compat_batch(batch) - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - } - metrics.scan_cost += start.elapsed(); - - let convert_start = Instant::now(); - let record_batch = mapper.convert(&batch, cache)?; - metrics.convert_cost += convert_start.elapsed(); - - Ok(Some(record_batch)) - } - - /// Scans a [PartitionRange] and returns a stream. - fn scan_partition_range<'a>( - stream_ctx: &'a StreamContext, - part_range: &'a PartitionRange, - mem_ranges: &'a mut Vec, - file_ranges: &'a mut Vec, - reader_metrics: &'a mut ReaderMetrics, - metrics: &'a mut ScannerMetrics, - ) -> impl Stream> + 'a { + /// Scans a [PartitionRange] by its `identifier` and returns a stream. + fn scan_partition_range( + stream_ctx: Arc, + part_range_id: usize, + part_metrics: PartitionMetrics, + ) -> impl Stream> { stream! { // Gets range meta. - let range_meta = &stream_ctx.ranges[part_range.identifier]; + let range_meta = &stream_ctx.ranges[part_range_id]; for index in &range_meta.row_group_indices { if stream_ctx.is_mem_range_index(*index) { - let stream = Self::scan_mem_ranges(stream_ctx, *index, mem_ranges, metrics); + let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index); for await batch in stream { yield batch; } } else { - let stream = Self::scan_file_ranges(stream_ctx, *index, file_ranges, reader_metrics, metrics); + let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files"); for await batch in stream { yield batch; } @@ -149,124 +103,68 @@ impl UnorderedScan { } } - /// Scans memtable ranges at `index`. - fn scan_mem_ranges<'a>( - stream_ctx: &'a StreamContext, - index: RowGroupIndex, - ranges: &'a mut Vec, - metrics: &'a mut ScannerMetrics, - ) -> impl Stream> + 'a { - try_stream! { - let mapper = &stream_ctx.input.mapper; - let cache = stream_ctx.input.cache_manager.as_deref(); - stream_ctx.build_mem_ranges(index, ranges); - metrics.num_mem_ranges += ranges.len(); - for range in ranges { - let build_reader_start = Instant::now(); - let iter = range.build_iter().map_err(BoxedError::new).context(ExternalSnafu)?; - metrics.build_reader_cost = build_reader_start.elapsed(); - - let mut source = Source::Iter(iter); - while let Some(batch) = - Self::fetch_from_source(&mut source, mapper, cache, None, metrics).await? - { - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - let yield_start = Instant::now(); - yield batch; - metrics.yield_cost += yield_start.elapsed(); - } - } - } - } - - /// Scans file ranges at `index`. - fn scan_file_ranges<'a>( - stream_ctx: &'a StreamContext, - index: RowGroupIndex, - ranges: &'a mut Vec, - reader_metrics: &'a mut ReaderMetrics, - metrics: &'a mut ScannerMetrics, - ) -> impl Stream> + 'a { - try_stream! { - let mapper = &stream_ctx.input.mapper; - let cache = stream_ctx.input.cache_manager.as_deref(); - stream_ctx - .build_file_ranges(index, ranges, reader_metrics) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - metrics.num_file_ranges += ranges.len(); - for range in ranges { - let build_reader_start = Instant::now(); - let reader = range - .reader(None) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu)?; - metrics.build_reader_cost += build_reader_start.elapsed(); - let compat_batch = range.compat_batch(); - let mut source = Source::PruneReader(reader); - while let Some(batch) = - Self::fetch_from_source(&mut source, mapper, cache, compat_batch, metrics) - .await? - { - metrics.num_batches += 1; - metrics.num_rows += batch.num_rows(); - let yield_start = Instant::now(); - yield batch; - metrics.yield_cost += yield_start.elapsed(); - } - if let Source::PruneReader(mut reader) = source { - reader_metrics.merge_from(reader.metrics()); - } - } - } - } - fn scan_partition_impl( &self, partition: usize, ) -> Result { - let mut metrics = ScannerMetrics { - prepare_scan_cost: self.stream_ctx.query_start.elapsed(), - ..Default::default() - }; + if partition >= self.properties.partitions.len() { + return Err(BoxedError::new( + PartitionOutOfRangeSnafu { + given: partition, + all: self.properties.partitions.len(), + } + .build(), + )); + } + + let part_metrics = PartitionMetrics::new( + self.stream_ctx.input.mapper.metadata().region_id, + partition, + "UnorderedScan", + self.stream_ctx.query_start, + ScannerMetrics { + prepare_scan_cost: self.stream_ctx.query_start.elapsed(), + ..Default::default() + }, + ); let stream_ctx = self.stream_ctx.clone(); - let ranges_opt = self.properties.partitions.get(partition).cloned(); + let part_ranges = self.properties.partitions[partition].clone(); - let stream = stream! { - let first_poll = stream_ctx.query_start.elapsed(); - let Some(part_ranges) = ranges_opt else { - return; - }; + let stream = try_stream! { + part_metrics.on_first_poll(); - let mut mem_ranges = Vec::new(); - let mut file_ranges = Vec::new(); - let mut reader_metrics = ReaderMetrics::default(); + let cache = stream_ctx.input.cache_manager.as_deref(); // Scans each part. for part_range in part_ranges { + let mut metrics = ScannerMetrics::default(); + let mut fetch_start = Instant::now(); + let stream = Self::scan_partition_range( - &stream_ctx, - &part_range, - &mut mem_ranges, - &mut file_ranges, - &mut reader_metrics, - &mut metrics, + stream_ctx.clone(), + part_range.identifier, + part_metrics.clone(), ); for await batch in stream { - yield batch; + let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?; + metrics.scan_cost += fetch_start.elapsed(); + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); + + let convert_start = Instant::now(); + let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?; + metrics.convert_cost += convert_start.elapsed(); + let yield_start = Instant::now(); + yield record_batch; + metrics.yield_cost += yield_start.elapsed(); + + fetch_start = Instant::now(); } + + metrics.scan_cost += fetch_start.elapsed(); + part_metrics.merge_metrics(&metrics); } - reader_metrics.observe_rows("unordered_scan_files"); - metrics.total_cost = stream_ctx.query_start.elapsed(); - metrics.observe_metrics_on_finish(); - let mapper = &stream_ctx.input.mapper; - debug!( - "Unordered scan partition {} finished, region_id: {}, metrics: {:?}, reader_metrics: {:?}, first_poll: {:?}", - partition, mapper.metadata().region_id, metrics, reader_metrics, first_poll, - ); + part_metrics.on_finish(); }; let stream = Box::pin(RecordBatchStreamWrapper::new( self.stream_ctx.input.mapper.output_schema(), @@ -302,20 +200,20 @@ impl RegionScanner for UnorderedScan { } impl DisplayAs for UnorderedScan { - fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { write!( f, "UnorderedScan: region={}, ", self.stream_ctx.input.mapper.metadata().region_id )?; - self.stream_ctx.format_for_explain(t, f) + self.stream_ctx.format_for_explain(f) } } impl fmt::Debug for UnorderedScan { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("UnorderedScan") - .field("parts", &self.stream_ctx.parts) + .field("num_ranges", &self.stream_ctx.ranges.len()) .finish() } } diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 447cd8f5766b..50d7a57cc189 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -90,7 +90,8 @@ impl FromStr for FileId { } } -/// Time range of a SST file. +/// Time range (min and max timestamps) of a SST file. +/// Both min and max are inclusive. pub type FileTimeRange = (Timestamp, Timestamp); /// Checks if two inclusive timestamp ranges overlap with each other. diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index ff7fe77e7a9f..b73026a7a6e3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -238,9 +238,6 @@ impl ParquetReaderBuilder { cache_manager: self.cache_manager.clone(), }; - // TODO(yingwen): count the cost of the method. - metrics.build_cost = start.elapsed(); - let mut filters = if let Some(predicate) = &self.predicate { predicate .exprs() @@ -270,6 +267,9 @@ impl ParquetReaderBuilder { ); let context = FileRangeContext::new(reader_builder, filters, read_format, codec); + + metrics.build_cost += start.elapsed(); + Ok((context, row_groups)) } diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 70dec3cad446..ed32a3661e7d 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -35,7 +35,7 @@ use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, - MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats, + MemtableRange, MemtableRef, MemtableStats, }; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -361,11 +361,3 @@ pub(crate) fn collect_iter_timestamps(iter: BoxedBatchIterator) -> Vec { .map(|v| v.unwrap().0.value()) .collect() } - -/// Builds a memtable range for test. -pub(crate) fn mem_range_for_test(id: MemtableId) -> MemtableRange { - let builder = Box::new(EmptyIterBuilder::default()); - - let context = Arc::new(MemtableRangeContext::new(id, builder)); - MemtableRange::new(context) -} diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 850b9ad3e2d6..04bc6df6bcee 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -180,7 +180,7 @@ impl ScannerPartitioning { pub struct PartitionRange { /// Start time of time index column. Inclusive. pub start: Timestamp, - /// End time of time index column. Inclusive. + /// End time of time index column. Exclusive. pub end: Timestamp, /// Number of rows in this range. Is used to balance ranges between partitions. pub num_rows: usize,