diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index b2c7add861ef..c99cfe061c9f 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -35,7 +35,7 @@ use crate::read::Batch; /// Should be unique under the same region. pub type MemtableId = u32; -pub type BoxedBatchIterator = Box>>; +pub type BoxedBatchIterator = Box> + Send + Sync>; /// In memory write buffer. pub trait Memtable: Send + Sync + fmt::Debug { diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index ce34898876b7..1a6a81b6f804 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -14,6 +14,8 @@ //! Common structs and utilities for reading data. +pub mod merge; + use std::sync::Arc; use api::v1::OpType; @@ -29,12 +31,12 @@ use datatypes::vectors::{ BooleanVector, Helper, UInt32Vector, UInt64Vector, UInt8Vector, Vector, VectorRef, }; use snafu::{ensure, OptionExt, ResultExt}; -use store_api::metadata::RegionMetadataRef; use store_api::storage::{ColumnId, SequenceNumber}; use crate::error::{ ComputeArrowSnafu, ComputeVectorSnafu, ConvertVectorSnafu, InvalidBatchSnafu, Result, }; +use crate::memtable::BoxedBatchIterator; /// Storage internal representation of a batch of rows /// for a primary key (time series). @@ -109,7 +111,7 @@ impl Batch { self.num_rows() == 0 } - /// Returns the first timestamp in the batch. + /// Returns the first timestamp in the batch or `None` if the batch is empty. pub fn first_timestamp(&self) -> Option { if self.timestamps.is_empty() { return None; @@ -118,7 +120,7 @@ impl Batch { Some(self.get_timestamp(0)) } - /// Returns the last timestamp in the batch. + /// Returns the last timestamp in the batch or `None` if the batch is empty. pub fn last_timestamp(&self) -> Option { if self.timestamps.is_empty() { return None; @@ -554,20 +556,23 @@ pub struct SourceStats { /// Async [Batch] reader and iterator wrapper. /// /// This is the data source for SST writers or internal readers. -pub enum Source {} +pub enum Source { + /// Source from a [BoxedBatchReader]. + Reader(BoxedBatchReader), + /// Source from a [BoxedBatchIterator]. + Iter(BoxedBatchIterator), +} impl Source { /// Returns next [Batch] from this data source. pub(crate) async fn next_batch(&mut self) -> Result> { - unimplemented!() - } - - /// Returns the metadata of the source region. - pub(crate) fn metadata(&self) -> RegionMetadataRef { - unimplemented!() + match self { + Source::Reader(reader) => reader.next_batch().await, + Source::Iter(iter) => iter.next().transpose(), + } } - // TODO(yingwen): Maybe remove this method. + // TODO(yingwen): Remove this method once we support collecting stats in the writer. /// Returns statisics of fetched batches. pub(crate) fn stats(&self) -> SourceStats { unimplemented!() @@ -603,38 +608,9 @@ impl BatchReader for Box { #[cfg(test)] mod tests { - use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; - use super::*; use crate::error::Error; - - fn new_batch_builder( - timestamps: &[i64], - sequences: &[u64], - op_types: &[OpType], - field: &[u64], - ) -> BatchBuilder { - let mut builder = BatchBuilder::new(b"test".to_vec()); - builder - .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( - timestamps.iter().copied(), - ))) - .unwrap() - .sequences_array(Arc::new(UInt64Array::from_iter_values( - sequences.iter().copied(), - ))) - .unwrap() - .op_types_array(Arc::new(UInt8Array::from_iter_values( - op_types.iter().map(|v| *v as u8), - ))) - .unwrap() - .push_field_array( - 1, - Arc::new(UInt64Array::from_iter_values(field.iter().copied())), - ) - .unwrap(); - builder - } + use crate::test_util::new_batch_builder; fn new_batch( timestamps: &[i64], @@ -642,7 +618,7 @@ mod tests { op_types: &[OpType], field: &[u64], ) -> Batch { - new_batch_builder(timestamps, sequences, op_types, field) + new_batch_builder(b"test", timestamps, sequences, op_types, field) .build() .unwrap() } diff --git a/src/mito2/src/read/merge.rs b/src/mito2/src/read/merge.rs new file mode 100644 index 000000000000..1ccbf61e0428 --- /dev/null +++ b/src/mito2/src/read/merge.rs @@ -0,0 +1,492 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Merge reader implementation. + +use std::cmp::Ordering; +use std::collections::BinaryHeap; +use std::mem; + +use async_trait::async_trait; + +use crate::error::Result; +use crate::memtable::BoxedBatchIterator; +use crate::read::{Batch, BatchReader, BoxedBatchReader, Source}; + +/// Reader to merge sorted batches. +/// +/// The merge reader merges [Batch]es from multiple sources that yield sorted batches. +/// 1. Batch is ordered by primary key, time index, sequence desc, op type desc (we can +/// ignore op type as sequence is already unique). +/// 2. Batch doesn't have duplicate elements (elements with the same primary key and time index). +pub struct MergeReader { + /// Holds a min-heap for all [Node]s. Each node yields batches from a `source`. + /// + /// `Node` in this heap **must** not be EOF. + nodes: BinaryHeap, + /// Batches for the next primary key. + batch_merger: BatchMerger, +} + +#[async_trait] +impl BatchReader for MergeReader { + async fn next_batch(&mut self) -> Result> { + // Collect batches from sources for the same primary key and return + // the collected batch. + while !self.nodes.is_empty() { + // Peek current key. + let Some(current_key) = self.batch_merger.primary_key() else { + // The merger is empty, we could push it directly. + self.take_batch_from_heap().await?; + // Try next node. + continue; + }; + // If next node has a different key, we have finish collecting current key. + // Safety: node is not empty. + if self.nodes.peek().unwrap().primary_key() != current_key { + break; + } + // They have the same primary key, we could take it and try next node. + self.take_batch_from_heap().await?; + } + + // Merge collected batches. + self.batch_merger.merge_batches() + } +} + +impl MergeReader { + /// Creates a new [MergeReader]. + pub async fn new(sources: Vec) -> Result { + let mut nodes = BinaryHeap::with_capacity(sources.len()); + for source in sources { + let node = Node::new(source).await?; + if !node.is_eof() { + // Ensure `nodes` don't have eof node. + nodes.push(node); + } + } + + Ok(MergeReader { + nodes, + batch_merger: BatchMerger::new(), + }) + } + + /// Takes batch from heap top and reheap. + async fn take_batch_from_heap(&mut self) -> Result<()> { + let mut next_node = self.nodes.pop().unwrap(); + let batch = next_node.fetch_batch().await?; + self.batch_merger.push(batch); + + // Insert the node back to the heap. + // If the node reaches EOF, ignores it. This ensures nodes in the heap is always not EOF. + if next_node.is_eof() { + return Ok(()); + } + self.nodes.push(next_node); + + Ok(()) + } +} + +/// Builder to build and initialize a [MergeReader]. +#[derive(Default)] +pub struct MergeReaderBuilder { + /// Input sources. + /// + /// All source must yield batches with the same schema. + sources: Vec, +} + +impl MergeReaderBuilder { + /// Returns an empty builder. + pub fn new() -> MergeReaderBuilder { + MergeReaderBuilder::default() + } + + /// Pushs a batch reader to sources. + pub fn push_batch_reader(&mut self, reader: BoxedBatchReader) -> &mut Self { + self.sources.push(Source::Reader(reader)); + self + } + + /// Push a batch iterator to sources. + pub fn push_batch_iter(&mut self, iter: BoxedBatchIterator) -> &mut Self { + self.sources.push(Source::Iter(iter)); + self + } + + /// Builds and initializes the reader, then resets the builder. + pub async fn build(&mut self) -> Result { + let sources = mem::take(&mut self.sources); + MergeReader::new(sources).await + } +} + +/// Helper to merge batches for same primary key. +struct BatchMerger { + /// Buffered non-empty batches to merge. + batches: Vec, + /// Whether the batch buffer is still sorted. + is_sorted: bool, +} + +impl BatchMerger { + /// Returns a empty merger. + fn new() -> BatchMerger { + BatchMerger { + batches: Vec::new(), + is_sorted: true, // An empty merger is always sorted. + } + } + + /// Returns the primary key of current merger and `None` if the merger is empty. + fn primary_key(&self) -> Option<&[u8]> { + self.batches.first().map(|batch| batch.primary_key()) + } + + /// Push a `batch` into the merger. + /// + /// Ignore the `batch` if it is empty. + /// + /// # Panics + /// Panics if the `batch` has another primary key. + fn push(&mut self, batch: Batch) { + if batch.is_empty() { + return; + } + + if self.batches.is_empty() || !self.is_sorted { + // Merger is empty or is not sorted, we can push the batch directly. + self.batches.push(batch); + return; + } + + // Merger is sorted, checks whether we can still preserve sorted state. + let last_batch = self.batches.last().unwrap(); + assert_eq!(last_batch.primary_key(), batch.primary_key()); + match last_batch.last_timestamp().cmp(&batch.first_timestamp()) { + Ordering::Less => { + // Still sorted. + self.batches.push(batch); + return; + } + Ordering::Equal => { + // Check sequence. + if last_batch.last_sequence() > batch.first_sequence() { + // Still sorted. + self.batches.push(batch); + return; + } + } + Ordering::Greater => (), + } + + // Merger is no longer sorted. + self.batches.push(batch); + self.is_sorted = false; + } + + /// Merge all buffered batches and returns the merged batch. Then + /// reset the buffer. + fn merge_batches(&mut self) -> Result> { + if self.batches.is_empty() { + return Ok(None); + } + + let batches = mem::take(&mut self.batches); + // Concat all batches. + let mut batch = Batch::concat(batches)?; + + // TODO(yingwen): metrics for sorted and unsorted batches. + if !self.is_sorted { + // Slow path. We need to merge overlapping batches. For simplicity, we + // just sort the all batches and remove duplications. + batch.sort_and_dedup()?; + // We don't need to remove duplications if timestamps of batches + // are not overlapping. + } + + // Filter rows by op type. Currently, the reader only removes deleted rows but doesn't filter + // rows by sequence for simplicity and performance reason. + batch.filter_deleted()?; + + Ok(Some(batch)) + } +} + +/// A `Node` represent an individual input data source to be merged. +struct Node { + /// Data source of this `Node`. + source: Source, + /// Current batch to be read. + /// + /// `None` means the `source` has reached EOF. + current_batch: Option, +} + +impl Node { + /// Initialize a node. + /// + /// It tries to fetch one batch from the `source`. + async fn new(mut source: Source) -> Result { + let current_batch = source.next_batch().await?.map(CompareFirst); + Ok(Node { + source, + current_batch, + }) + } + + /// Returns whether the node still has batch to read. + fn is_eof(&self) -> bool { + self.current_batch.is_none() + } + + /// Returns the primary key of current batch. + /// + /// # Panics + /// Panics if the node has reached EOF. + fn primary_key(&self) -> &[u8] { + self.current_batch().primary_key() + } + + /// Returns current batch. + /// + /// # Panics + /// Panics if the node has reached EOF. + fn current_batch(&self) -> &Batch { + &self.current_batch.as_ref().unwrap().0 + } + + /// Returns current batch and fetches next batch + /// from the source. + /// + /// # Panics + /// Panics if the node has reached EOF. + async fn fetch_batch(&mut self) -> Result { + let current = self.current_batch.take().unwrap(); + self.current_batch = self.source.next_batch().await?.map(CompareFirst); + Ok(current.0) + } +} + +impl PartialEq for Node { + fn eq(&self, other: &Node) -> bool { + self.current_batch == other.current_batch + } +} + +impl Eq for Node {} + +impl PartialOrd for Node { + fn partial_cmp(&self, other: &Node) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for Node { + fn cmp(&self, other: &Node) -> Ordering { + // The std binary heap is a max heap, but we want the nodes are ordered in + // ascend order, so we compare the nodes in reverse order. + other.current_batch.cmp(&self.current_batch) + } +} + +/// Type to compare [Batch] by first row. +/// +/// It ignores op type as sequence is enough to distinguish different rows. +struct CompareFirst(Batch); + +impl PartialEq for CompareFirst { + fn eq(&self, other: &Self) -> bool { + self.0.primary_key() == other.0.primary_key() + && self.0.first_timestamp() == other.0.first_timestamp() + && self.0.first_sequence() == other.0.first_sequence() + } +} + +impl Eq for CompareFirst {} + +impl PartialOrd for CompareFirst { + fn partial_cmp(&self, other: &CompareFirst) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for CompareFirst { + /// Compares by primary key, time index, sequence desc. + fn cmp(&self, other: &CompareFirst) -> Ordering { + self.0 + .primary_key() + .cmp(other.0.primary_key()) + .then_with(|| self.0.first_timestamp().cmp(&other.0.first_timestamp())) + .then_with(|| other.0.first_sequence().cmp(&self.0.first_sequence())) + } +} + +#[cfg(test)] +mod tests { + use api::v1::OpType; + + use super::*; + use crate::test_util::{new_batch, VecBatchReader}; + + #[tokio::test] + async fn test_merge_reader_empty() { + let mut reader = MergeReaderBuilder::new().build().await.unwrap(); + assert!(reader.next_batch().await.unwrap().is_none()); + assert!(reader.next_batch().await.unwrap().is_none()); + } + + async fn check_merge_result(reader: &mut MergeReader, expect: &[Batch]) { + let mut result = Vec::new(); + while let Some(batch) = reader.next_batch().await.unwrap() { + result.push(batch); + } + + assert_eq!(expect, result); + } + + #[tokio::test] + async fn test_merge_non_overlapping() { + let reader1 = VecBatchReader::new(&[ + new_batch( + b"k1", + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], + ), + new_batch( + b"k1", + &[7, 8], + &[17, 18], + &[OpType::Put, OpType::Delete], + &[27, 28], + ), + new_batch( + b"k2", + &[2, 3], + &[12, 13], + &[OpType::Delete, OpType::Put], + &[22, 23], + ), + ]); + let reader2 = VecBatchReader::new(&[new_batch( + b"k1", + &[4, 5], + &[14, 15], + &[OpType::Put, OpType::Put], + &[24, 25], + )]); + let mut reader = MergeReaderBuilder::new() + .push_batch_reader(Box::new(reader1)) + .push_batch_iter(Box::new(reader2)) + .build() + .await + .unwrap(); + check_merge_result( + &mut reader, + &[ + new_batch( + b"k1", + &[1, 2, 4, 5, 7], + &[11, 12, 14, 15, 17], + &[ + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + OpType::Put, + ], + &[21, 22, 24, 25, 27], + ), + new_batch(b"k2", &[3], &[13], &[OpType::Put], &[23]), + ], + ) + .await; + } + + #[tokio::test] + async fn test_merge_overlapping() { + let reader1 = VecBatchReader::new(&[ + new_batch( + b"k1", + &[1, 2], + &[11, 12], + &[OpType::Put, OpType::Put], + &[21, 22], + ), + new_batch( + b"k1", + &[4, 5], + &[14, 15], + // This override 4 and deletes 5. + &[OpType::Put, OpType::Delete], + &[24, 25], + ), + new_batch( + b"k2", + &[2, 3], + &[12, 13], + // This delete 2. + &[OpType::Delete, OpType::Put], + &[22, 23], + ), + ]); + let reader2 = VecBatchReader::new(&[ + new_batch( + b"k1", + &[3, 4, 5], + &[10, 10, 10], + &[OpType::Put, OpType::Put, OpType::Put], + &[33, 34, 35], + ), + new_batch( + b"k2", + &[1, 10], + &[11, 20], + &[OpType::Put, OpType::Put], + &[21, 30], + ), + ]); + let mut reader = MergeReaderBuilder::new() + .push_batch_reader(Box::new(reader1)) + .push_batch_iter(Box::new(reader2)) + .build() + .await + .unwrap(); + check_merge_result( + &mut reader, + &[ + new_batch( + b"k1", + &[1, 2, 3, 4], + &[11, 12, 10, 14], + &[OpType::Put, OpType::Put, OpType::Put, OpType::Put], + &[21, 22, 33, 24], + ), + new_batch( + b"k2", + &[1, 3, 10], + &[11, 13, 20], + &[OpType::Put, OpType::Put, OpType::Put], + &[21, 23, 30], + ), + ], + ) + .await; + } +} diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index bd247d3f5c83..a286f336f27b 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -21,6 +21,7 @@ use parquet::file::metadata::KeyValue; use parquet::file::properties::WriterProperties; use parquet::schema::types::ColumnPath; use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; use store_api::storage::consts::SEQUENCE_COLUMN_NAME; use crate::error::{InvalidMetadataSnafu, Result}; @@ -35,15 +36,23 @@ pub struct ParquetWriter<'a> { file_path: &'a str, /// Input data source. source: Source, + /// Region metadata of the source and the target SST. + metadata: RegionMetadataRef, object_store: ObjectStore, } impl<'a> ParquetWriter<'a> { /// Creates a new parquet SST writer. - pub fn new(file_path: &'a str, source: Source, object_store: ObjectStore) -> ParquetWriter { + pub fn new( + file_path: &'a str, + metadata: RegionMetadataRef, + source: Source, + object_store: ObjectStore, + ) -> ParquetWriter { ParquetWriter { file_path, source, + metadata, object_store, } } @@ -52,11 +61,9 @@ impl<'a> ParquetWriter<'a> { /// /// Returns the [SstInfo] if the SST is written. pub async fn write_all(&mut self, opts: &WriteOptions) -> Result> { - let metadata = self.source.metadata(); - - let json = metadata.to_json().context(InvalidMetadataSnafu)?; + let json = self.metadata.to_json().context(InvalidMetadataSnafu)?; let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json); - let ts_column = metadata.time_index_column(); + let ts_column = self.metadata.time_index_column(); // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid. let props_builder = WriterProperties::builder() @@ -78,7 +85,7 @@ impl<'a> ParquetWriter<'a> { ); let writer_props = props_builder.build(); - let write_format = WriteFormat::new(metadata); + let write_format = WriteFormat::new(self.metadata.clone()); let mut buffered_writer = BufferedWriter::try_new( self.file_path.to_string(), self.object_store.clone(), diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 8b791c0acbd6..f3b1a177e7d3 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -19,9 +19,10 @@ use std::sync::Arc; use api::greptime_proto::v1; use api::v1::value::ValueData; -use api::v1::SemanticType; +use api::v1::{OpType, SemanticType}; use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use log_store::raft_engine::log_store::RaftEngineLogStore; @@ -35,6 +36,7 @@ use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::error::Result; use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions}; +use crate::read::{Batch, BatchBuilder, BatchReader}; use crate::worker::WorkerGroup; /// Env to test mito engine. @@ -242,3 +244,72 @@ pub(crate) fn ts_ms_value(data: i64) -> v1::Value { value_data: Some(ValueData::TsMillisecondValue(data)), } } + +/// A reader for test that pop [Batch] from a vector. +pub struct VecBatchReader { + batches: Vec, +} + +impl VecBatchReader { + pub fn new(batches: &[Batch]) -> VecBatchReader { + let batches = batches.iter().rev().cloned().collect(); + + VecBatchReader { batches } + } +} + +#[async_trait::async_trait] +impl BatchReader for VecBatchReader { + async fn next_batch(&mut self) -> Result> { + Ok(self.batches.pop()) + } +} + +impl Iterator for VecBatchReader { + type Item = Result; + + fn next(&mut self) -> Option> { + self.batches.pop().map(Ok) + } +} + +pub fn new_batch_builder( + primary_key: &[u8], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + field: &[u64], +) -> BatchBuilder { + let mut builder = BatchBuilder::new(primary_key.to_vec()); + builder + .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values( + timestamps.iter().copied(), + ))) + .unwrap() + .sequences_array(Arc::new(UInt64Array::from_iter_values( + sequences.iter().copied(), + ))) + .unwrap() + .op_types_array(Arc::new(UInt8Array::from_iter_values( + op_types.iter().map(|v| *v as u8), + ))) + .unwrap() + .push_field_array( + 1, + Arc::new(UInt64Array::from_iter_values(field.iter().copied())), + ) + .unwrap(); + builder +} + +pub fn new_batch( + primary_key: &[u8], + timestamps: &[i64], + sequences: &[u64], + op_types: &[OpType], + field: &[u64], +) -> Batch { + new_batch_builder(primary_key, timestamps, sequences, op_types, field) + .build() + .unwrap() +}