Skip to content

Commit

Permalink
perf: count(*) for append-only tables (#4545)
Browse files Browse the repository at this point in the history
* feat: support fast count(*) for append-only tables

* fix: total_rows stats in time series memtable

* fix: sqlness result changes for SinglePartitionScanner -> StreamScanAdapter

* fix: some cr comments
  • Loading branch information
v0y4g3r authored Aug 13, 2024
1 parent 4466fee commit 216bce6
Show file tree
Hide file tree
Showing 17 changed files with 384 additions and 109 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ store-api.workspace = true

[dev-dependencies]
common-base.workspace = true
futures-util.workspace = true
tokio.workspace = true
2 changes: 2 additions & 0 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ pub mod logical_plan;
pub mod prelude;
pub mod request;
mod signature;
pub mod stream;
#[cfg(any(test, feature = "testing"))]
pub mod test_util;

use std::fmt::{Debug, Display, Formatter};
use std::sync::Arc;

Expand Down
175 changes: 175 additions & 0 deletions src/common/query/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};

use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::execution::context::TaskContext;
use datafusion::execution::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning, PhysicalSortExpr};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
};
use datafusion_common::DataFusionError;
use datatypes::arrow::datatypes::SchemaRef as ArrowSchemaRef;
use datatypes::schema::SchemaRef;

/// Adapts greptime's [SendableRecordBatchStream] to DataFusion's [ExecutionPlan].
pub struct StreamScanAdapter {
stream: Mutex<Option<SendableRecordBatchStream>>,
schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
properties: PlanProperties,
output_ordering: Option<Vec<PhysicalSortExpr>>,
}

impl Debug for StreamScanAdapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamScanAdapter")
.field("stream", &"<SendableRecordBatchStream>")
.field("schema", &self.schema)
.finish()
}
}

impl StreamScanAdapter {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema();
let arrow_schema = schema.arrow_schema().clone();
let properties = PlanProperties::new(
EquivalenceProperties::new(arrow_schema.clone()),
Partitioning::UnknownPartitioning(1),
ExecutionMode::Bounded,
);

Self {
stream: Mutex::new(Some(stream)),
schema,
arrow_schema,
properties,
output_ordering: None,
}
}

pub fn with_output_ordering(mut self, output_ordering: Option<Vec<PhysicalSortExpr>>) -> Self {
self.output_ordering = output_ordering;
self
}
}

impl DisplayAs for StreamScanAdapter {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"StreamScanAdapter: [<SendableRecordBatchStream>], schema: ["
)?;
write!(f, "{:?}", &self.arrow_schema)?;
write!(f, "]")
}
}

impl ExecutionPlan for StreamScanAdapter {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> ArrowSchemaRef {
self.arrow_schema.clone()
}

fn properties(&self) -> &PlanProperties {
&self.properties
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

// DataFusion will swap children unconditionally.
// But since this node is leaf node, it's safe to just return self.
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
Ok(self.clone())
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> datafusion_common::Result<DfSendableRecordBatchStream> {
let mut stream = self.stream.lock().unwrap();
let stream = stream
.take()
.ok_or_else(|| DataFusionError::Execution("Stream already exhausted".to_string()))?;
Ok(Box::pin(DfRecordBatchStreamAdapter::new(stream)))
}
}

#[cfg(test)]
mod test {
use common_recordbatch::{RecordBatch, RecordBatches};
use datafusion::prelude::SessionContext;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures_util::TryStreamExt;

use super::*;

#[tokio::test]
async fn test_simple_table_scan() {
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"a",
ConcreteDataType::int32_datatype(),
false,
)]));

let batch1 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([1, 2])) as _],
)
.unwrap();
let batch2 = RecordBatch::new(
schema.clone(),
vec![Arc::new(Int32Vector::from_slice([3, 4, 5])) as _],
)
.unwrap();

let recordbatches =
RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap();
let stream = recordbatches.as_stream();

let scan = StreamScanAdapter::new(stream);

assert_eq!(scan.schema(), schema.arrow_schema().clone());

let stream = scan.execute(0, ctx.task_ctx()).unwrap();
let recordbatches = stream.try_collect::<Vec<_>>().await.unwrap();
assert_eq!(recordbatches[0], batch1.into_df_record_batch());
assert_eq!(recordbatches[1], batch2.into_df_record_batch());

let result = scan.execute(0, ctx.task_ctx());
assert!(result.is_err());
match result {
Err(e) => assert!(e.to_string().contains("Stream already exhausted")),
_ => unreachable!(),
}
}
}
3 changes: 2 additions & 1 deletion src/file-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ impl RegionEngine for FileRegionEngine {
request: ScanRequest,
) -> Result<RegionScannerRef, BoxedError> {
let stream = self.handle_query(region_id, request).await?;
let scanner = Box::new(SinglePartitionScanner::new(stream));
// We don't support enabling append mode for file engine.
let scanner = Box::new(SinglePartitionScanner::new(stream, false));
Ok(scanner)
}

Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ pub struct MemtableStats {
/// The time range that this memtable contains. It is None if
/// and only if the memtable is empty.
time_range: Option<(Timestamp, Timestamp)>,
/// Total rows in memtable
num_rows: usize,
}

impl MemtableStats {
Expand All @@ -88,6 +90,11 @@ impl MemtableStats {
pub fn time_range(&self) -> Option<(Timestamp, Timestamp)> {
self.time_range
}

/// Returns the num of total rows in memtable.
pub fn num_rows(&self) -> usize {
self.num_rows
}
}

pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
Expand Down
9 changes: 8 additions & 1 deletion src/mito2/src/memtable/partition_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod shard_builder;
mod tree;

use std::fmt;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::Arc;

use common_base::readable_size::ReadableSize;
Expand Down Expand Up @@ -114,6 +114,8 @@ pub struct PartitionTreeMemtable {
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
min_timestamp: AtomicI64,
/// Total written rows in memtable. This also includes deleted and duplicated rows.
num_rows: AtomicUsize,
}

impl fmt::Debug for PartitionTreeMemtable {
Expand All @@ -139,6 +141,7 @@ impl Memtable for PartitionTreeMemtable {

self.update_stats(&metrics);

self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
res
}

Expand All @@ -150,6 +153,7 @@ impl Memtable for PartitionTreeMemtable {

self.update_stats(&metrics);

self.num_rows.fetch_add(1, Ordering::Relaxed);
res
}

Expand Down Expand Up @@ -202,6 +206,7 @@ impl Memtable for PartitionTreeMemtable {
return MemtableStats {
estimated_bytes,
time_range: None,
num_rows: 0,
};
}

Expand All @@ -219,6 +224,7 @@ impl Memtable for PartitionTreeMemtable {
MemtableStats {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
}
}

Expand Down Expand Up @@ -256,6 +262,7 @@ impl PartitionTreeMemtable {
alloc_tracker,
max_timestamp: AtomicI64::new(i64::MIN),
min_timestamp: AtomicI64::new(i64::MAX),
num_rows: AtomicUsize::new(0),
}
}

Expand Down
10 changes: 9 additions & 1 deletion src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, Bound, HashSet};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -101,6 +101,8 @@ pub struct TimeSeriesMemtable {
min_timestamp: AtomicI64,
dedup: bool,
merge_mode: MergeMode,
/// Total written rows in memtable. This also includes deleted and duplicated rows.
num_rows: AtomicUsize,
}

impl TimeSeriesMemtable {
Expand Down Expand Up @@ -133,6 +135,7 @@ impl TimeSeriesMemtable {
min_timestamp: AtomicI64::new(i64::MAX),
dedup,
merge_mode,
num_rows: Default::default(),
}
}

Expand Down Expand Up @@ -232,6 +235,8 @@ impl Memtable for TimeSeriesMemtable {
// We may lift the primary key length check out of Memtable::write
// so that we can ensure writing to memtable will succeed.
self.update_stats(local_stats);

self.num_rows.fetch_add(kvs.num_rows(), Ordering::Relaxed);
Ok(())
}

Expand All @@ -241,6 +246,7 @@ impl Memtable for TimeSeriesMemtable {
local_stats.allocated += std::mem::size_of::<Timestamp>() + std::mem::size_of::<OpType>();

self.update_stats(local_stats);
self.num_rows.fetch_add(1, Ordering::Relaxed);
res
}

Expand Down Expand Up @@ -320,6 +326,7 @@ impl Memtable for TimeSeriesMemtable {
return MemtableStats {
estimated_bytes,
time_range: None,
num_rows: 0,
};
}
let ts_type = self
Expand All @@ -335,6 +342,7 @@ impl Memtable for TimeSeriesMemtable {
MemtableStats {
estimated_bytes,
time_range: Some((min_timestamp, max_timestamp)),
num_rows: self.num_rows.load(Ordering::Relaxed),
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,12 @@ impl ScanInput {
}
});
}

pub(crate) fn total_rows(&self) -> usize {
let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
rows_in_files + rows_in_memtables
}
}

#[cfg(test)]
Expand Down
5 changes: 4 additions & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::new_with_partitions(parallelism);
let properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
let stream_ctx = Arc::new(StreamContext::new(input));

Self {
Expand Down
9 changes: 6 additions & 3 deletions src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ pub struct UnorderedScan {
impl UnorderedScan {
/// Creates a new [UnorderedScan].
pub(crate) fn new(input: ScanInput) -> Self {
let properties =
ScannerProperties::new_with_partitions(input.parallelism.parallelism.max(1));
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
let stream_ctx = Arc::new(StreamContext::new(input));

Self {
Expand Down Expand Up @@ -135,7 +138,7 @@ impl RegionScanner for UnorderedScan {
}

fn prepare(&mut self, ranges: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
self.properties = ScannerProperties::new(ranges);
self.properties.partitions = ranges;
Ok(())
}

Expand Down
Loading

0 comments on commit 216bce6

Please sign in to comment.