Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuqi-lucas committed Dec 31, 2024
1 parent 9b5995f commit 079a59e
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 25 deletions.
8 changes: 8 additions & 0 deletions datafusion-examples/examples/custom_file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ impl FileFormat for TSVFileFormat {
.await
}

async fn infer_file_ordering(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> Option<&String> {
None
}

async fn create_physical_plan(
&self,
state: &SessionState,
Expand Down
189 changes: 183 additions & 6 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1977,7 +1977,14 @@ mod tests {

use crate::prelude::{CsvReadOptions, NdJsonReadOptions, ParquetReadOptions};
use arrow::array::Int32Array;
use datafusion_common::{assert_batches_eq, Constraint, Constraints, ScalarValue};
use arrow::util::pretty::pretty_format_batches;
use arrow_array::TimestampNanosecondArray;
use arrow_schema::TimeUnit;
use datafusion_common::config::TableParquetOptions;
use datafusion_common::{
assert_batches_eq, assert_contains, assert_not_contains, Constraint, Constraints,
ScalarValue,
};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::{
Expand All @@ -1989,6 +1996,7 @@ mod tests {
use datafusion_functions_window::nth_value::first_value_udwf;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
use rand::Rng;
use sqlparser::ast::NullTreatment;
use tempfile::TempDir;

Expand Down Expand Up @@ -4136,11 +4144,6 @@ mod tests {
let df = ctx.sql("SELECT * FROM data").await?;
let results = df.collect().await?;

let df_explain = ctx.sql("explain SELECT a FROM data").await?;
let explain_result = df_explain.collect().await?;

println!("explain_result {:?}", explain_result);

assert_batches_eq!(
&[
"+---+---+",
Expand Down Expand Up @@ -4327,4 +4330,178 @@ mod tests {
);
Ok(())
}

#[tokio::test]
async fn write_parquet_with_order_metadata() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]));

let tmp_dir = TempDir::new()?;

// It should work only when we enable the collect_statistics
let ctx = SessionContext::new_with_config(
SessionConfig::default()
.set_bool("datafusion.execution.collect_statistics", true),
);

// random write data to parquet
let num_rows = 1000;
let mut rng = rand::thread_rng();
let ids: Vec<i64> = (0..num_rows).collect();
let timestamps: Vec<i64> = (0..num_rows)
.map(|_| rng.gen_range(1_700_000_000_000..1_800_000_000_000))
.collect();

let id_array = Arc::new(Int64Array::from(ids));
let timestamp_array = Arc::new(TimestampNanosecondArray::from(timestamps));

let batch =
RecordBatch::try_new(schema.clone(), vec![id_array, timestamp_array])?;

let file = tmp_dir.path().join("testSorted.parquet");
let write_df = ctx.read_batch(batch)?;

write_df
.clone()
.write_parquet(
file.to_str().unwrap(),
DataFrameWriteOptions::new()
.with_sort_by(vec![col("timestamp").sort(true, false)]),
Some(TableParquetOptions::new()),
)
.await?;

// Create the table without with order
let sql_str =
"create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'"
.to_owned()
+ file.to_str().unwrap()
+ "'";

ctx.sql(sql_str.as_str()).await?.collect().await?;

let sql_result = ctx
.sql("SELECT * FROM sortData order by timestamp")
.await?
.explain(false, false)?
.collect()
.await?;

let formatted = pretty_format_batches(&sql_result).unwrap().to_string();
// Assert we have the output_ordering in the explain plan
assert_contains!(
formatted.as_str(),
"output_ordering=[timestamp@1 ASC NULLS LAST]"
);

// Assert we don't contain SortExec in the plan, the optimizer can optimize to remove the sort
assert_not_contains!(formatted.as_str(), "SortExec");

// testing multi col sort case
write_df
.clone()
.write_parquet(
file.to_str().unwrap(),
DataFrameWriteOptions::new().with_sort_by(vec![
col("timestamp").sort(true, false),
col("id").sort(true, false),
]),
Some(TableParquetOptions::new()),
)
.await?;

let sql_result = ctx
.sql("SELECT * FROM sortData")
.await?
.explain(false, false)?
.collect()
.await?;

let formatted = pretty_format_batches(&sql_result).unwrap().to_string();
// Assert we have the output_ordering in the explain plan
assert_contains!(
formatted.as_str(),
"output_ordering=[timestamp@1 ASC NULLS LAST, id@0 ASC NULLS LAST]"
);

// Assert we don't contain SortExec in the plan, the optimizer can optimize to remove the sort
assert_not_contains!(formatted.as_str(), "SortExec");
Ok(())
}

#[tokio::test]
async fn write_parquet_without_order_metadata() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
]));

let tmp_dir = TempDir::new()?;

let ctx = SessionContext::new();

// random write data to parquet
let num_rows = 1000;
let mut rng = rand::thread_rng();
let ids: Vec<i64> = (0..num_rows).collect();
let timestamps: Vec<i64> = (0..num_rows)
.map(|_| rng.gen_range(1_700_000_000_000..1_800_000_000_000))
.collect();

let id_array = Arc::new(Int64Array::from(ids));
let timestamp_array = Arc::new(TimestampNanosecondArray::from(timestamps));

let batch =
RecordBatch::try_new(schema.clone(), vec![id_array, timestamp_array])?;

let file = tmp_dir.path().join("testSorted.parquet");
let write_df = ctx.read_batch(batch)?;

write_df
.clone()
.write_parquet(
file.to_str().unwrap(),
DataFrameWriteOptions::new(),
Some(TableParquetOptions::new()),
)
.await?;

// Create the table without with order
let sql_str =
"create external table sortData(id INT, timestamp TIMESTAMP) stored as parquet location'"
.to_owned()
+ file.to_str().unwrap()
+ "'";

ctx.sql(sql_str.as_str()).await?.collect().await?;

let sql_result = ctx
.sql("SELECT * FROM sortData order by timestamp")
.await?
.explain(false, false)?
.collect()
.await?;

let formatted = pretty_format_batches(&sql_result).unwrap().to_string();
// Assert we don't have the output_ordering in the explain plan because we don't disable the statistics
assert_not_contains!(
formatted.as_str(),
"output_ordering=[timestamp@1 ASC NULLS LAST]"
);

// Assert we contain SortExec in the plan
// the optimizer will not remove it without metadata sort information
assert_contains!(formatted.as_str(), "SortExec");
Ok(())
}
}
26 changes: 19 additions & 7 deletions datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

use std::sync::Arc;

use crate::datasource::file_format::{
format_as_file_type, parquet::ParquetFormatFactory,
};
use crate::datasource::file_format::{format_as_file_type, parquet::ParquetFormatFactory, DefaultFileType};

use super::{
DataFrame, DataFrameWriteOptions, DataFusionError, LogicalPlanBuilder, RecordBatch,
Expand Down Expand Up @@ -66,10 +64,24 @@ impl DataFrame {
);
}

let format = if let Some(parquet_opts) = writer_options {
Arc::new(ParquetFormatFactory::new_with_options(parquet_opts))
} else {
Arc::new(ParquetFormatFactory::new())
let format = match writer_options {
Some(mut parquet_opts) => {
if !options.sort_by.clone().is_empty() {
parquet_opts.key_value_metadata.insert(
"DATAFUSION_ORDER_BY".to_string(),
Some(
options
.sort_by
.iter()
.map(|sort| sort.to_string())
.collect::<Vec<String>>()
.join(", "),
),
);
}
Arc::new(ParquetFormatFactory::new_with_options(parquet_opts))
}
None => Arc::new(ParquetFormatFactory::new()),
};

let file_type = format_as_file_type(format);
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ impl FileFormat for ArrowFormat {
Ok(Statistics::new_unknown(&table_schema))
}

async fn infer_file_ordering(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> Option<String> {
// todo for now we don't support infer ordering for Arrow files
None
}

async fn create_physical_plan(
&self,
_state: &SessionState,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ impl FileFormat for AvroFormat {
Ok(Arc::new(merged_schema))
}

async fn infer_file_ordering(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> Option<String> {
// todo Avro files sort order are not sorted
None
}

async fn infer_stats(
&self,
_state: &SessionState,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,15 @@ impl FileFormat for CsvFormat {
Ok(Statistics::new_unknown(&table_schema))
}

async fn infer_file_ordering(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> Option<String> {
// CSV infer files order info is not supported
None
}

async fn create_physical_plan(
&self,
state: &SessionState,
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ impl FileFormat for JsonFormat {
Ok(Statistics::new_unknown(&table_schema))
}

async fn infer_file_ordering(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> Option<String> {
// Json infer files order are not supported
None
}

async fn create_physical_plan(
&self,
_state: &SessionState,
Expand Down
19 changes: 13 additions & 6 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ pub mod options;
pub mod parquet;
pub mod write;

use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::fmt::{self, Debug, Display};
use std::sync::Arc;
use std::task::Poll;

use crate::arrow::datatypes::SchemaRef;
use crate::datasource::physical_plan::{FileScanConfig, FileSinkConfig};
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::physical_plan::{ExecutionPlan, Statistics};
use ::parquet::arrow::parquet_to_arrow_schema;
use std::any::Any;
use std::collections::{HashMap, VecDeque};
use std::fmt::{self, Debug, Display};
use std::sync::Arc;
use std::task::Poll;

use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema};
Expand All @@ -50,6 +50,7 @@ use datafusion_common::{internal_err, not_impl_err, GetExt};
use datafusion_expr::Expr;
use datafusion_physical_expr::PhysicalExpr;

use crate::datasource::file_format::parquet::fetch_parquet_metadata;
use async_trait::async_trait;
use bytes::{Buf, Bytes};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
Expand Down Expand Up @@ -123,6 +124,12 @@ pub trait FileFormat: Send + Sync + Debug {
object: &ObjectMeta,
) -> Result<Statistics>;

async fn infer_file_ordering(
&self,
store: &Arc<dyn ObjectStore>,
object: &ObjectMeta,
) -> Option<String>;

/// Take a list of files and convert it to the appropriate executor
/// according to this file format.
async fn create_physical_plan(
Expand Down
Loading

0 comments on commit 079a59e

Please sign in to comment.