Skip to content

Commit

Permalink
Minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
OussamaSaoudi-db committed Sep 24, 2024
1 parent 059b48c commit 3a360ab
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
8 changes: 4 additions & 4 deletions kernel/src/engine/arrow_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ pub fn expression_to_row_filter(
Ok(RowFilter::new(vec![Box::new(arrow_predicate)]))
}

pub fn get_columns_from_expression(expr: &Expression) -> Vec<String> {
fn get_columns_from_expression_impl(expr: &Expression, out: &mut Vec<String>) {
pub fn get_columns_from_expression(expr: &Expression) -> Vec<&str> {
fn get_columns_from_expression_impl<'a>(expr: &'a Expression, out: &mut Vec<&'a str>) {
match expr {
Expression::Column(col_name) => {
let root_name = col_name.split('.').next().unwrap_or(col_name).to_string();
let root_name = col_name.split('.').next().unwrap_or(col_name);
out.push(root_name)
}
Expression::Struct(fields) => fields
Expand All @@ -82,7 +82,7 @@ pub fn get_columns_from_expression(expr: &Expression) -> Vec<String> {
Expression::VariadicOperation { op: _, exprs } => exprs
.iter()
.for_each(|expr| get_columns_from_expression_impl(expr, out)),
_ => (),
Expression::Literal(_) => (),
}
}
let mut out = vec![];
Expand Down
11 changes: 5 additions & 6 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ impl FileOpener for PresignedUrlOpener {
}

if let Some(predicate) = predicate {
let parquet_schema = metadata.schema();
let parquet_physical_schema = metadata.parquet_schema();
let row_filter = expression_to_row_filter(
predicate,
Expand Down Expand Up @@ -274,7 +273,7 @@ mod tests {
.map(Into::into)
}

async fn get_record_batch(path: &str, predicate: Option<Expression>) -> Vec<RecordBatch> {
async fn get_record_batches(path: &str, predicate: Option<Expression>) -> Vec<RecordBatch> {
let store = Arc::new(LocalFileSystem::new());

let path = std::fs::canonicalize(PathBuf::from(path)).unwrap();
Expand Down Expand Up @@ -312,7 +311,7 @@ mod tests {
async fn test_read_parquet_files_with_empty_output() {
let predicate = Some(Expr::lt(Expr::column("value"), Expr::literal(0)));
let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
let data = get_record_batch(path, predicate).await;
let data = get_record_batches(path, predicate).await;
assert_eq!(data.len(), 0);
}

Expand All @@ -328,7 +327,7 @@ mod tests {
];
let path = "./tests/data/table-with-dv-small/part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet";
for (num_rows, predicate) in cases {
let data = get_record_batch(path, predicate).await;
let data = get_record_batches(path, predicate).await;
assert_eq!(data.len(), 1);
assert_eq!(data[0].num_rows(), num_rows);
}
Expand All @@ -343,8 +342,8 @@ mod tests {

let path =
"./tests/data/app-txn-checkpoint/_delta_log/00000000000000000001.checkpoint.parquet";
let data_filtered = get_record_batch(path, predicate).await;
let data = get_record_batch(path, None).await;
let data_filtered = get_record_batches(path, predicate).await;
let data = get_record_batches(path, None).await;

let mut metadata_filtered: Vec<Metadata> = vec![];
let mut metadata_expected: Vec<Metadata> = vec![];
Expand Down

0 comments on commit 3a360ab

Please sign in to comment.