diff --git a/src/mito2/src/engine/prune_test.rs b/src/mito2/src/engine/prune_test.rs index 687172417d14..b7f9056e5857 100644 --- a/src/mito2/src/engine/prune_test.rs +++ b/src/mito2/src/engine/prune_test.rs @@ -259,3 +259,56 @@ async fn test_prune_memtable_complex_expr() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_mem_range_prune() { + let mut env = TestEnv::new(); + let engine = env.create_engine(MitoConfig::default()).await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + let column_schemas = rows_schema(&request); + + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: build_rows(5, 8), + }, + ) + .await; + + // Starts scan and gets the memtable time range. + let stream = engine + .scan_to_stream(region_id, ScanRequest::default()) + .await + .unwrap(); + + put_rows( + &engine, + region_id, + Rows { + schema: column_schemas.clone(), + rows: build_rows(10, 12), + }, + ) + .await; + + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 5 | 5.0 | 1970-01-01T00:00:05 | +| 6 | 6.0 | 1970-01-01T00:00:06 | +| 7 | 7.0 | 1970-01-01T00:00:07 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 668eb51023f0..a6963ece42ec 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -34,8 +34,10 @@ pub use crate::memtable::key_values::KeyValues; use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; use crate::memtable::time_series::TimeSeriesMemtableBuilder; use crate::metrics::WRITE_BUFFER_BYTES; +use crate::read::prune::PruneTimeIterator; use crate::read::Batch; use crate::region::options::{MemtableOptions, MergeMode}; +use crate::sst::file::FileTimeRange; pub mod bulk; pub mod key_values; @@ -355,8 +357,10 @@ impl MemtableRange { } /// Builds an iterator to read the range. - pub fn build_iter(&self) -> Result { - self.context.builder.build() + /// Filters the result by the specific time range. + pub fn build_iter(&self, time_range: FileTimeRange) -> Result { + let iter = self.context.builder.build()?; + Ok(Box::new(PruneTimeIterator::new(iter, time_range))) } } diff --git a/src/mito2/src/read/prune.rs b/src/mito2/src/read/prune.rs index 0ab2f9e2b5d5..cb0066e73472 100644 --- a/src/mito2/src/read/prune.rs +++ b/src/mito2/src/read/prune.rs @@ -12,9 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_time::Timestamp; +use datatypes::scalars::ScalarVectorBuilder; +use datatypes::vectors::BooleanVectorBuilder; + use crate::error::Result; +use crate::memtable::BoxedBatchIterator; use crate::read::last_row::RowGroupLastRowCachedReader; use crate::read::{Batch, BatchReader}; +use crate::sst::file::FileTimeRange; use crate::sst::parquet::file_range::FileRangeContextRef; use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader}; @@ -112,3 +118,214 @@ impl PruneReader { } } } + +/// An iterator that prunes batches by time range. +pub(crate) struct PruneTimeIterator { + iter: BoxedBatchIterator, + time_range: FileTimeRange, +} + +impl PruneTimeIterator { + /// Creates a new `PruneTimeIterator` with the given iterator and time range. + pub(crate) fn new(iter: BoxedBatchIterator, time_range: FileTimeRange) -> Self { + Self { iter, time_range } + } + + /// Prune batch by time range. + fn prune(&self, mut batch: Batch) -> Result { + if batch.is_empty() { + return Ok(batch); + } + + // fast path, the batch is within the time range. + // Note that the time range is inclusive. + if self.time_range.0 <= batch.first_timestamp().unwrap() + && batch.last_timestamp().unwrap() <= self.time_range.1 + { + return Ok(batch); + } + + // slow path, prune the batch by time range. + // Note that the timestamp precision may be different from the time range. + // Safety: We know this is the timestamp type. + let unit = batch + .timestamps() + .data_type() + .as_timestamp() + .unwrap() + .unit(); + let mut filter_builder = BooleanVectorBuilder::with_capacity(batch.timestamps().len()); + let timestamps = batch.timestamps_native().unwrap(); + for ts in timestamps { + let ts = Timestamp::new(*ts, unit); + if self.time_range.0 <= ts && ts <= self.time_range.1 { + filter_builder.push(Some(true)); + } else { + filter_builder.push(Some(false)); + } + } + let filter = filter_builder.finish(); + + batch.filter(&filter)?; + Ok(batch) + } + + // Prune and return the next non-empty batch. + fn next_non_empty_batch(&mut self) -> Result> { + while let Some(batch) = self.iter.next() { + let batch = batch?; + let pruned_batch = self.prune(batch)?; + if !pruned_batch.is_empty() { + return Ok(Some(pruned_batch)); + } + } + Ok(None) + } +} + +impl Iterator for PruneTimeIterator { + type Item = Result; + + fn next(&mut self) -> Option { + self.next_non_empty_batch().transpose() + } +} + +#[cfg(test)] +mod tests { + use api::v1::OpType; + + use super::*; + use crate::test_util::new_batch; + + #[test] + fn test_prune_time_iter_empty() { + let input = []; + let iter = input.into_iter().map(Ok); + let iter = PruneTimeIterator::new( + Box::new(iter), + ( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(1000), + ), + ); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + assert!(actual.is_empty()); + } + + #[test] + fn test_prune_time_iter_filter() { + let input = [ + new_batch( + b"k1", + &[10, 11], + &[20, 20], + &[OpType::Put, OpType::Put], + &[110, 111], + ), + new_batch( + b"k1", + &[15, 16], + &[20, 20], + &[OpType::Put, OpType::Put], + &[115, 116], + ), + new_batch( + b"k1", + &[17, 18], + &[20, 20], + &[OpType::Put, OpType::Put], + &[117, 118], + ), + ]; + + let iter = input.clone().into_iter().map(Ok); + let iter = PruneTimeIterator::new( + Box::new(iter), + ( + Timestamp::new_millisecond(10), + Timestamp::new_millisecond(15), + ), + ); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + assert_eq!( + actual, + [ + new_batch( + b"k1", + &[10, 11], + &[20, 20], + &[OpType::Put, OpType::Put], + &[110, 111], + ), + new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],), + ] + ); + + let iter = input.clone().into_iter().map(Ok); + let iter = PruneTimeIterator::new( + Box::new(iter), + ( + Timestamp::new_millisecond(11), + Timestamp::new_millisecond(20), + ), + ); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + assert_eq!( + actual, + [ + new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],), + new_batch( + b"k1", + &[15, 16], + &[20, 20], + &[OpType::Put, OpType::Put], + &[115, 116], + ), + new_batch( + b"k1", + &[17, 18], + &[20, 20], + &[OpType::Put, OpType::Put], + &[117, 118], + ), + ] + ); + + let iter = input.into_iter().map(Ok); + let iter = PruneTimeIterator::new( + Box::new(iter), + ( + Timestamp::new_millisecond(10), + Timestamp::new_millisecond(18), + ), + ); + let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect(); + assert_eq!( + actual, + [ + new_batch( + b"k1", + &[10, 11], + &[20, 20], + &[OpType::Put, OpType::Put], + &[110, 111], + ), + new_batch( + b"k1", + &[15, 16], + &[20, 20], + &[OpType::Put, OpType::Put], + &[115, 116], + ), + new_batch( + b"k1", + &[17, 18], + &[20, 20], + &[OpType::Put, OpType::Put], + &[117, 118], + ), + ] + ); + } +} diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 74e7af42d627..d27468521a7d 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -26,6 +26,7 @@ use crate::error::Result; use crate::read::range::RowGroupIndex; use crate::read::scan_region::StreamContext; use crate::read::{Batch, ScannerMetrics, Source}; +use crate::sst::file::FileTimeRange; use crate::sst::parquet::reader::ReaderMetrics; struct PartitionMetricsInner { @@ -128,13 +129,14 @@ pub(crate) fn scan_mem_ranges( stream_ctx: Arc, part_metrics: PartitionMetrics, index: RowGroupIndex, + time_range: FileTimeRange, ) -> impl Stream> { try_stream! { let ranges = stream_ctx.build_mem_ranges(index); part_metrics.inc_num_mem_ranges(ranges.len()); for range in ranges { let build_reader_start = Instant::now(); - let iter = range.build_iter()?; + let iter = range.build_iter(time_range)?; part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); let mut source = Source::Iter(iter); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 5049ea8a470b..1a789c8d5f21 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -355,7 +355,12 @@ fn build_sources( sources.reserve(range_meta.row_group_indices.len()); for index in &range_meta.row_group_indices { let stream = if stream_ctx.is_mem_range_index(*index) { - let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index); + let stream = scan_mem_ranges( + stream_ctx.clone(), + part_metrics.clone(), + *index, + range_meta.time_range, + ); Box::pin(stream) as _ } else { let read_type = if compaction { diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 72bf4240200c..2401504f7915 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -89,7 +89,7 @@ impl UnorderedScan { let range_meta = &stream_ctx.ranges[part_range_id]; for index in &range_meta.row_group_indices { if stream_ctx.is_mem_range_index(*index) { - let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index); + let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range); for await batch in stream { yield batch; } diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index ed32a3661e7d..55b42d61dd41 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -124,16 +124,6 @@ impl MemtableBuilder for EmptyMemtableBuilder { } } -/// Empty iterator builder. -#[derive(Default)] -pub(crate) struct EmptyIterBuilder {} - -impl IterBuilder for EmptyIterBuilder { - fn build(&self) -> Result { - Ok(Box::new(std::iter::empty())) - } -} - /// Creates a region metadata to test memtable with default pk. /// /// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`.