Skip to content

Commit

Permalink
Handle one-element array return value in ScalarFunctionExpr
Browse files Browse the repository at this point in the history
This was done in apache#12922 only for math functions.
We now generalize this fallback to all scalar UDFs.
  • Loading branch information
joroKr21 committed Oct 16, 2024
1 parent 747001a commit f58bd6e
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 19 deletions.
11 changes: 0 additions & 11 deletions datafusion/expr-common/src/columnar_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,17 +217,6 @@ impl ColumnarValue {
}
}
}

/// Converts an [`ArrayRef`] to a [`ColumnarValue`] based on the supplied arguments.
/// This is useful for scalar UDF implementations to fulfil their contract:
/// if all arguments are scalar values, the result should also be a scalar value.
pub fn from_args_and_result(args: &[Self], result: ArrayRef) -> Result<Self> {
if result.len() == 1 && args.iter().all(|arg| matches!(arg, Self::Scalar(_))) {
Ok(Self::Scalar(ScalarValue::try_from_array(&result, 0)?))
} else {
Ok(Self::Array(result))
}
}
}

#[cfg(test)]
Expand Down
12 changes: 6 additions & 6 deletions datafusion/functions/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ macro_rules! make_math_unary_udf {
$EVALUATE_BOUNDS(inputs)
}

fn invoke(&self, col_args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(col_args)?;
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => {
Arc::new(make_function_scalar_inputs_return_type!(
Expand Down Expand Up @@ -257,7 +257,7 @@ macro_rules! make_math_unary_udf {
}
};

ColumnarValue::from_args_and_result(col_args, arr)
Ok(ColumnarValue::Array(arr))
}

fn documentation(&self) -> Option<&Documentation> {
Expand Down Expand Up @@ -344,8 +344,8 @@ macro_rules! make_math_binary_udf {
$OUTPUT_ORDERING(input)
}

fn invoke(&self, col_args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(col_args)?;
fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let args = ColumnarValue::values_to_arrays(args)?;
let arr: ArrayRef = match args[0].data_type() {
DataType::Float64 => Arc::new(make_function_inputs2!(
&args[0],
Expand All @@ -372,7 +372,7 @@ macro_rules! make_math_binary_udf {
}
};

ColumnarValue::from_args_and_result(col_args, arr)
Ok(ColumnarValue::Array(arr))
}

fn documentation(&self) -> Option<&Documentation> {
Expand Down
16 changes: 14 additions & 2 deletions datafusion/physical-expr/src/scalar_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ use crate::PhysicalExpr;

use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{internal_err, DFSchema, Result};
use arrow_array::Array;
use datafusion_common::{internal_err, DFSchema, Result, ScalarValue};
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_expr::sort_properties::ExprProperties;
use datafusion_expr::type_coercion::functions::data_types_with_scalar_udf;
use datafusion_expr::{expr_vec_fmt, ColumnarValue, Expr, ScalarUDF};
use datafusion_expr_common::signature::Volatility;

/// Physical expression of a scalar function
pub struct ScalarFunctionExpr {
Expand Down Expand Up @@ -146,7 +148,17 @@ impl PhysicalExpr for ScalarFunctionExpr {
}?;

if let ColumnarValue::Array(array) = &output {
if array.len() != batch.num_rows() {
let len = array.len();
// If the function is not volatile and all arguments are scalars,
// we can assume that returning a one-element array is equivalent to returning a scalar.
let preserve_scalar = len == 1
&& self.fun.signature().volatility != Volatility::Volatile
&& inputs
.iter()
.all(|arg| matches!(arg, ColumnarValue::Scalar(_)));
if preserve_scalar {
return ScalarValue::try_from_array(array, 0).map(ColumnarValue::Scalar);
} else if len != batch.num_rows() {
return internal_err!("UDF returned a different number of rows than expected. Expected: {}, Got: {}",
batch.num_rows(), array.len());
}
Expand Down

0 comments on commit f58bd6e

Please sign in to comment.