Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into infer_between_predicates
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 1, 2023
2 parents b2014ff + 14cdf72 commit ca163a9
Show file tree
Hide file tree
Showing 19 changed files with 100 additions and 95 deletions.
76 changes: 14 additions & 62 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,24 @@ jobs:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Cache Cargo
uses: actions/cache@v3
with:
# these represent dependencies downloaded by cargo
# and thus do not depend on the OS, arch nor rust version.
path: /github/home/.cargo
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
rust-version: stable

- name: Cache Cargo
uses: actions/cache@v3
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
./target/
./datafusion-cli/target/
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-benchmark-${{ hashFiles('datafusion/**/Cargo.toml', 'benchmarks/Cargo.toml', 'datafusion-cli/Cargo.toml') }}

- name: Check workspace without default features
run: cargo check --no-default-features -p datafusion

Expand All @@ -84,12 +90,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -109,12 +109,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down Expand Up @@ -211,12 +205,6 @@ jobs:
image: amd64/rust
steps:
- uses: actions/checkout@v4
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -238,12 +226,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -259,7 +241,7 @@ jobs:
- name: Verify that benchmark queries return expected results
run: |
export TPCH_DATA=`realpath datafusion/sqllogictest/test_files/tpch/data`
cargo test serde_q --profile release-nonlto --features=ci -- --test-threads=1
cargo test plan_q --package datafusion-benchmarks --profile release-nonlto --features=ci -- --test-threads=1
INCLUDE_TPCH=true cargo test --test sqllogictests
- name: Verify Working Directory Clean
run: git diff --exit-code
Expand Down Expand Up @@ -377,12 +359,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- uses: actions/setup-python@v4
with:
python-version: "3.8"
Expand Down Expand Up @@ -480,12 +456,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -506,12 +476,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -531,12 +495,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand All @@ -563,12 +521,6 @@ jobs:
- uses: actions/checkout@v4
with:
submodules: true
- name: Cache Cargo
uses: actions/cache@v3
with:
path: /github/home/.cargo
# this key equals the ones on `linux-build-lib` for re-use
key: cargo-cache-
- name: Setup Rust toolchain
uses: ./.github/actions/setup-builder
with:
Expand Down
4 changes: 4 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ default = ["parquet"]
pyarrow = ["pyo3", "arrow/pyarrow"]

[dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
apache-avro = { version = "0.16", default-features = false, features = ["snappy"], optional = true }
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
half = { version = "2.1", default-features = false }
num_cpus = "1.13.0"
object_store = { version = "0.7.0", default-features = false, optional = true }
parquet = { workspace = true, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ config_namespace! {
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub compression: Option<String>, default = None
pub compression: Option<String>, default = Some("zstd(3)".into())

/// Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@

//! Functionality used both on logical and physical plans
use std::sync::Arc;

use ahash::RandomState;
use arrow::array::*;
use arrow::datatypes::*;
use arrow::row::Rows;
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use arrow_buffer::i256;
use datafusion_common::{
cast::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array,
},
internal_err, DataFusionError, Result,

use crate::cast::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array,
};
use std::sync::Arc;
use crate::error::{DataFusionError, Result, _internal_err};

// Combines two hashes into one hash
#[inline]
Expand All @@ -51,7 +51,7 @@ fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col:
}
}

pub(crate) trait HashValue {
pub trait HashValue {
fn hash_one(&self, state: &RandomState) -> u64;
}

Expand Down Expand Up @@ -337,7 +337,7 @@ pub fn create_hashes<'a>(
}
_ => {
// This is internal because we should have caught this before.
return internal_err!(
return _internal_err!(
"Unsupported data type in hasher: {}",
col.data_type()
);
Expand Down
1 change: 1 addition & 0 deletions datafusion/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod error;
pub mod file_options;
pub mod format;
mod functional_dependencies;
pub mod hash_utils;
mod join_type;
pub mod parsers;
#[cfg(feature = "pyarrow")]
Expand Down
37 changes: 33 additions & 4 deletions datafusion/core/tests/user_defined/user_defined_aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,37 @@ async fn test_udaf_returning_struct_subquery() {
assert_batches_eq!(expected, &execute(&ctx, sql).await.unwrap());
}

#[tokio::test]
async fn test_udaf_shadows_builtin_fn() {
let TestContext {
mut ctx,
test_state,
} = TestContext::new();
let sql = "SELECT sum(arrow_cast(time, 'Int64')) from t";

// compute with builtin `sum` aggregator
let expected = [
"+-------------+",
"| SUM(t.time) |",
"+-------------+",
"| 19000 |",
"+-------------+",
];
assert_batches_eq!(expected, &execute(&ctx, sql).await.unwrap());

// Register `TimeSum` with name `sum`. This will shadow the builtin one
let sql = "SELECT sum(time) from t";
TimeSum::register(&mut ctx, test_state.clone(), "sum");
let expected = [
"+----------------------------+",
"| sum(t.time) |",
"+----------------------------+",
"| 1970-01-01T00:00:00.000019 |",
"+----------------------------+",
];
assert_batches_eq!(expected, &execute(&ctx, sql).await.unwrap());
}

async fn execute(ctx: &SessionContext, sql: &str) -> Result<Vec<RecordBatch>> {
ctx.sql(sql).await?.collect().await
}
Expand Down Expand Up @@ -214,7 +245,7 @@ impl TestContext {
// Tell DataFusion about the "first" function
FirstSelector::register(&mut ctx);
// Tell DataFusion about the "time_sum" function
TimeSum::register(&mut ctx, Arc::clone(&test_state));
TimeSum::register(&mut ctx, Arc::clone(&test_state), "time_sum");

Self { ctx, test_state }
}
Expand Down Expand Up @@ -281,7 +312,7 @@ impl TimeSum {
Self { sum: 0, test_state }
}

fn register(ctx: &mut SessionContext, test_state: Arc<TestState>) {
fn register(ctx: &mut SessionContext, test_state: Arc<TestState>, name: &str) {
let timestamp_type = DataType::Timestamp(TimeUnit::Nanosecond, None);

// Returns the same type as its input
Expand All @@ -301,8 +332,6 @@ impl TimeSum {
let accumulator: AccumulatorFactoryFunction =
Arc::new(move |_| Ok(Box::new(Self::new(Arc::clone(&captured_state)))));

let name = "time_sum";

let time_sum =
AggregateUDF::new(name, &signature, &return_type, &accumulator, &state_type);

Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-expr/src/expressions/in_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use crate::hash_utils::HashValue;
use crate::physical_expr::down_cast_any_ref;
use crate::utils::expr_list_eq_any_order;
use crate::PhysicalExpr;
Expand All @@ -37,6 +36,7 @@ use arrow::datatypes::*;
use arrow::record_batch::RecordBatch;
use arrow::util::bit_iterator::BitIndexIterator;
use arrow::{downcast_dictionary_array, downcast_primitive_array};
use datafusion_common::hash_utils::HashValue;
use datafusion_common::{
cast::{as_boolean_array, as_generic_binary_array, as_string_array},
internal_err, not_impl_err, DataFusionError, Result, ScalarValue,
Expand Down
4 changes: 3 additions & 1 deletion datafusion/physical-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub mod equivalence;
pub mod execution_props;
pub mod expressions;
pub mod functions;
pub mod hash_utils;
pub mod intervals;
pub mod math_expressions;
mod partitioning;
Expand All @@ -49,6 +48,9 @@ pub mod utils;
pub mod var_provider;
pub mod window;

// For backwards compatibility
pub use datafusion_common::hash_utils;

pub use aggregate::groups_accumulator::{
EmitTo, GroupsAccumulator, GroupsAccumulatorAdapter,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, Rows, SortField};
use arrow_array::{Array, ArrayRef};
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::memory_pool::proxy::{RawTableAllocExt, VecAllocExt};
use datafusion_physical_expr::hash_utils::create_hashes;
use datafusion_physical_expr::EmitTo;
use hashbrown::raw::RawTable;

Expand Down
3 changes: 2 additions & 1 deletion datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,11 @@ pub mod windows;

use crate::repartition::RepartitionExec;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
pub use datafusion_common::hash_utils;
pub use datafusion_common::utils::project_schema;
use datafusion_execution::TaskContext;
pub use datafusion_physical_expr::{
expressions, functions, hash_utils, ordering_equivalence_properties_helper, udf,
expressions, functions, ordering_equivalence_properties_helper, udf,
};

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl LimitStream {

match &poll {
Poll::Ready(Some(Ok(batch))) => {
if batch.num_rows() > 0 && self.skip == 0 {
if batch.num_rows() > 0 {
break poll;
} else {
// continue to poll input stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use arrow::{
datatypes::{Schema, SchemaBuilder, SchemaRef},
record_batch::RecordBatch,
};

use datafusion_common::hash_utils::create_hashes;
use datafusion_common::utils::{
evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
get_record_batch_at_indices, get_row_at_idx,
Expand All @@ -51,7 +53,6 @@ use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::hash_utils::create_hashes;
use datafusion_physical_expr::window::{
PartitionBatches, PartitionKey, PartitionWindowAggStates, WindowState,
};
Expand Down
Loading

0 comments on commit ca163a9

Please sign in to comment.