Skip to content

Commit

Permalink
fix: memtable prune (#2698)
Browse files Browse the repository at this point in the history
* fix: memtable prune

* test: add memtable prune test case with complex filter

* fix: test format
  • Loading branch information
v0y4g3r authored Nov 7, 2023
1 parent 7e0dcfc commit c79bb5a
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 12 deletions.
53 changes: 53 additions & 0 deletions src/mito2/src/engine/prune_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,3 +213,56 @@ async fn test_prune_memtable() {
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_prune_memtable_complex_expr() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);

engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// 0 ~ 10 in memtable
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 10),
},
)
.await;

// ts filter will be ignored when pruning time series in memtable.
let filters = vec![time_range_expr(4, 7), Expr::from(col("tag_0").lt(lit("6")))];

let stream = engine
.handle_query(
region_id,
ScanRequest {
filters,
..Default::default()
},
)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
20 changes: 13 additions & 7 deletions src/mito2/src/memtable/time_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};

use api::v1::OpType;
use common_telemetry::debug;
use common_telemetry::{debug, error};
use common_time::Timestamp;
use datafusion::physical_plan::PhysicalExpr;
use datafusion_common::ScalarValue;
Expand Down Expand Up @@ -400,7 +400,7 @@ fn prune_primary_key(
codec: &Arc<McmpRowCodec>,
pk: &[u8],
series: &mut Series,
builders: &mut Vec<Box<dyn MutableVector>>,
builders: &mut [Box<dyn MutableVector>],
pk_schema: arrow::datatypes::SchemaRef,
predicate: &[Arc<dyn PhysicalExpr>],
) -> bool {
Expand All @@ -411,11 +411,18 @@ fn prune_primary_key(

if let Some(rb) = series.pk_cache.as_ref() {
let res = prune_inner(predicate, rb).unwrap_or(true);
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
debug!(
"Prune primary key: {:?}, predicate: {:?}, res: {:?}",
rb, predicate, res
);
res
} else {
let Ok(rb) = pk_to_record_batch(codec, pk, builders, pk_schema) else {
return true;
let rb = match pk_to_record_batch(codec, pk, builders, pk_schema) {
Ok(rb) => rb,
Err(e) => {
error!(e; "Failed to build record batch from primary keys");
return true;
}
};
let res = prune_inner(predicate, &rb).unwrap_or(true);
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
Expand Down Expand Up @@ -459,11 +466,10 @@ fn prune_inner(predicates: &[Arc<dyn PhysicalExpr>], primary_key: &RecordBatch)
fn pk_to_record_batch(
codec: &Arc<McmpRowCodec>,
bytes: &[u8],
builders: &mut Vec<Box<dyn MutableVector>>,
builders: &mut [Box<dyn MutableVector>],
pk_schema: arrow::datatypes::SchemaRef,
) -> Result<RecordBatch> {
let pk_values = codec.decode(bytes).unwrap();
assert_eq!(builders.len(), pk_values.len());

let arrays = builders
.iter_mut()
Expand Down
28 changes: 23 additions & 5 deletions src/table/src/predicate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::predicate::stats::RowGroupPruningStatistics;

mod stats;

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct Predicate {
/// logical exprs
exprs: Vec<Expr>,
Expand Down Expand Up @@ -67,13 +67,14 @@ impl Predicate {
// registering variables.
let execution_props = &ExecutionProps::new();

self.exprs
Ok(self
.exprs
.iter()
.map(|expr| {
.filter_map(|expr| {
create_physical_expr(expr.df_expr(), df_schema.as_ref(), schema, execution_props)
.ok()
})
.collect::<Result<_, _>>()
.context(error::DatafusionSnafu)
.collect::<Vec<_>>())
}

/// Builds an empty predicate from given schema.
Expand Down Expand Up @@ -749,4 +750,21 @@ mod tests {
.or(datafusion_expr::Expr::Column(Column::from_name("cnt")).lt(20.lit()));
assert_prune(40, vec![e.into()], vec![true, true, false, true]).await;
}

#[tokio::test]
async fn test_to_physical_expr() {
let predicate = Predicate::new(vec![
Expr::from(col("host").eq(lit("host_a"))),
Expr::from(col("ts").gt(lit(ScalarValue::TimestampMicrosecond(Some(123), None)))),
]);

let schema = Arc::new(arrow::datatypes::Schema::new(vec![Field::new(
"host",
arrow::datatypes::DataType::Utf8,
false,
)]));

let predicates = predicate.to_physical_exprs(&schema).unwrap();
assert!(!predicates.is_empty());
}
}

0 comments on commit c79bb5a

Please sign in to comment.