Skip to content

Commit

Permalink
feat: Implements row group level parallel unordered scanner (#3992)
Browse files Browse the repository at this point in the history
* feat: unordered scanner

* feat: support compat

* chore: update debug print

fix: missing ranges in scan parts

* fix: ensure chunk size > 0

* fix: parallel is disabled if there is only one file and memtable

* chore: reader metrics

* chore: remove todo

* refactor: add ScanPartBuilder trait

* chore: pass file meta to the part builder

* chore: make part builder private

* docs: update comment

* chore: remove meta()

* refactor: only prune file ranges in ScanInput

replaces ScanPartBuilder with FileRangeCollector which only collect file
ranges

* chore: address typo

* fix: panic when no partition

* feat: Postpone part distribution

* chore: handle empty partition in mito

* style: fix clippy
  • Loading branch information
evenyag authored May 29, 2024
1 parent f0effd2 commit 848bd7e
Show file tree
Hide file tree
Showing 14 changed files with 491 additions and 172 deletions.
2 changes: 1 addition & 1 deletion src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ mod tests {
let file_metas: Vec<_> = data.version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta())
.map(|file| file.meta_ref().clone())
.collect();

// 5 files for next compaction and removes old files.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/compaction/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl CompactionTaskImpl {
Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum());

for output in self.outputs.drain(..) {
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));
compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));

info!(
"Compaction region {} output [{}]-> {}",
Expand Down Expand Up @@ -229,7 +229,7 @@ impl CompactionTaskImpl {
return Err(e);
}
};
deleted.extend(self.expired_ssts.iter().map(FileHandle::meta));
deleted.extend(self.expired_ssts.iter().map(|f| f.meta_ref().clone()));
let merge_time = merge_timer.stop_and_record();
info!(
"Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s",
Expand Down
3 changes: 1 addition & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,11 @@ impl MitoEngine {
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
) -> Result<SendableRecordBatchStream, BoxedError> {
self.scanner(region_id, request)
.map_err(BoxedError::new)?
.scan()
.await
.map_err(BoxedError::new)
}

/// Returns a scanner to scan for `request`.
Expand Down
4 changes: 4 additions & 0 deletions src/mito2/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::error::{
ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result,
};
use crate::memtable::BoxedBatchIterator;
use crate::sst::parquet::reader::RowGroupReader;

/// Storage internal representation of a batch of rows for a primary key (time series).
///
Expand Down Expand Up @@ -699,6 +700,8 @@ pub enum Source {
Iter(BoxedBatchIterator),
/// Source from a [BoxedBatchStream].
Stream(BoxedBatchStream),
/// Source from a [RowGroupReader].
RowGroupReader(RowGroupReader),
}

impl Source {
Expand All @@ -708,6 +711,7 @@ impl Source {
Source::Reader(reader) => reader.next_batch().await,
Source::Iter(iter) => iter.next().transpose(),
Source::Stream(stream) => stream.try_next().await,
Source::RowGroupReader(reader) => reader.next_batch().await,
}
}
}
Expand Down
44 changes: 34 additions & 10 deletions src/mito2/src/read/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
pub struct CompatReader<R> {
/// Underlying reader.
reader: R,
/// Optional primary key adapter.
compat_pk: Option<CompatPrimaryKey>,
/// Optional fields adapter.
compat_fields: Option<CompatFields>,
/// Helper to compat batches.
compat: CompatBatch,
}

impl<R> CompatReader<R> {
Expand All @@ -48,13 +46,9 @@ impl<R> CompatReader<R> {
reader_meta: RegionMetadataRef,
reader: R,
) -> Result<CompatReader<R>> {
let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
let compat_fields = may_compat_fields(mapper, &reader_meta)?;

Ok(CompatReader {
reader,
compat_pk,
compat_fields,
compat: CompatBatch::new(mapper, reader_meta)?,
})
}
}
Expand All @@ -66,14 +60,44 @@ impl<R: BatchReader> BatchReader for CompatReader<R> {
return Ok(None);
};

batch = self.compat.compat_batch(batch)?;

Ok(Some(batch))
}
}

/// A helper struct to adapt schema of the batch to an expected schema.
pub(crate) struct CompatBatch {
/// Optional primary key adapter.
compat_pk: Option<CompatPrimaryKey>,
/// Optional fields adapter.
compat_fields: Option<CompatFields>,
}

impl CompatBatch {
/// Creates a new [CompatBatch].
/// - `mapper` is built from the metadata users expect to see.
/// - `reader_meta` is the metadata of the input reader.
pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
let compat_fields = may_compat_fields(mapper, &reader_meta)?;

Ok(Self {
compat_pk,
compat_fields,
})
}

/// Adapts the `batch` to the expected schema.
pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
if let Some(compat_pk) = &self.compat_pk {
batch = compat_pk.compat(batch)?;
}
if let Some(compat_fields) = &self.compat_fields {
batch = compat_fields.compat(batch);
}

Ok(Some(batch))
Ok(batch)
}
}

Expand Down
121 changes: 106 additions & 15 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

//! Scans a region according to the scan request.
use std::fmt;
use std::sync::Arc;
use std::time::Instant;

use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
Expand All @@ -32,15 +34,16 @@ use crate::cache::CacheManagerRef;
use crate::error::Result;
use crate::memtable::MemtableRef;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::CompatReader;
use crate::read::compat::{CompatBatch, CompatReader};
use crate::read::projection::ProjectionMapper;
use crate::read::seq_scan::SeqScan;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{compat, Batch, Source};
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
use crate::sst::file::{FileHandle, FileMeta};
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;

/// A scanner scans a region and returns a [SendableRecordBatchStream].
pub(crate) enum Scanner {
Expand All @@ -51,20 +54,24 @@ pub(crate) enum Scanner {
}

impl Scanner {
/// Returns a [SendableRecordBatchStream] to retrieve scan results.
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream> {
/// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
match self {
Scanner::Seq(seq_scan) => seq_scan.build_stream().await,
Scanner::Seq(seq_scan) => seq_scan.build_stream().await.map_err(BoxedError::new),
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}

/// Returns a [RegionScanner] to scan the region.
pub(crate) async fn region_scanner(&self) -> Result<RegionScannerRef> {
let stream = self.scan().await?;
let scanner = SinglePartitionScanner::new(stream);

Ok(Arc::new(scanner))
pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
match self {
Scanner::Seq(seq_scan) => {
let stream = seq_scan.build_stream().await?;
let scanner = Arc::new(SinglePartitionScanner::new(stream));
Ok(scanner)
}
Scanner::Unordered(unordered_scan) => Ok(Arc::new(unordered_scan)),
}
}
}

Expand Down Expand Up @@ -222,9 +229,7 @@ impl ScanRegion {
/// Unordered scan.
pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
let input = self.scan_input(true)?;
let scan = UnorderedScan::new(input);

Ok(scan)
Ok(UnorderedScan::new(input))
}

#[cfg(test)]
Expand Down Expand Up @@ -386,7 +391,7 @@ pub(crate) struct ScanInput {
/// Time range filter for time index.
time_range: Option<TimestampRange>,
/// Predicate to push down.
predicate: Option<Predicate>,
pub(crate) predicate: Option<Predicate>,
/// Memtables to scan.
pub(crate) memtables: Vec<MemtableRef>,
/// Handles to SST files to scan.
Expand Down Expand Up @@ -498,7 +503,6 @@ impl ScanInput {
}

/// Sets whether to remove deletion markers during scan.
#[allow(unused)]
#[must_use]
pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
self.filter_deleted = filter_deleted;
Expand Down Expand Up @@ -572,6 +576,61 @@ impl ScanInput {
Ok(sources)
}

/// Prunes file ranges to scan and adds them tothe `collector`.
pub(crate) async fn prune_file_ranges(
&self,
collector: &mut impl FileRangeCollector,
) -> Result<()> {
for file in &self.files {
let res = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.clone())
.time_range(self.time_range)
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_manager.clone())
.index_applier(self.index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input()
.await;
let (mut file_range_ctx, row_groups) = match res {
Ok(x) => x,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
continue;
} else {
return Err(e);
}
}
};
if !compat::has_same_columns(
self.mapper.metadata(),
file_range_ctx.read_format().metadata(),
) {
// They have different schema. We need to adapt the batch first so the
// mapper can convert it.
let compat = CompatBatch::new(
&self.mapper,
file_range_ctx.read_format().metadata().clone(),
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
// Build ranges from row groups.
let file_range_ctx = Arc::new(file_range_ctx);
let file_ranges = row_groups
.into_iter()
.map(|(row_group_idx, row_selection)| {
FileRange::new(file_range_ctx.clone(), row_group_idx, row_selection)
});
collector.append_file_ranges(file.meta_ref(), file_ranges);
}

READ_SST_COUNT.observe(self.files.len() as f64);

Ok(())
}

/// Scans the input source in another task and sends batches to the sender.
pub(crate) fn spawn_scan_task(
&self,
Expand Down Expand Up @@ -620,3 +679,35 @@ impl ScanInput {
self.files.iter().map(|file| file.file_id()).collect()
}
}

/// A partition of a scanner to read.
/// It contains memtables and file ranges to scan.
#[derive(Default)]
pub(crate) struct ScanPart {
/// Memtables to scan.
/// We scan the whole memtable now. We might scan a range of the memtable in the future.
pub(crate) memtables: Vec<MemtableRef>,
/// File ranges to scan.
pub(crate) file_ranges: Vec<FileRange>,
}

impl fmt::Debug for ScanPart {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ScanPart({} memtables, {} file ranges)",
self.memtables.len(),
self.file_ranges.len()
)
}
}

/// A trait to collect file ranges to scan.
pub(crate) trait FileRangeCollector {
/// Appends file ranges from the **same file** to the collector.
fn append_file_ranges(
&mut self,
file_meta: &FileMeta,
file_ranges: impl Iterator<Item = FileRange>,
);
}
Loading

0 comments on commit 848bd7e

Please sign in to comment.