Skip to content

Commit

Permalink
feat: Add xxhash64 function support (apache#424)
Browse files Browse the repository at this point in the history
* feat: Add xxhash64 function support

* Update related docs

* Update core/src/execution/datafusion/spark_hash.rs

Co-authored-by: Liang-Chi Hsieh <[email protected]>

* Update QueriesList results

---------

Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Parth Chandra <[email protected]>
  • Loading branch information
3 people authored Jun 3, 2024
1 parent 24781fb commit c79bd5c
Show file tree
Hide file tree
Showing 10 changed files with 862 additions and 448 deletions.
2 changes: 2 additions & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ once_cell = "1.18.0"
regex = "1.9.6"
crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
twox-hash = "1.6.3"

[build-dependencies]
prost-build = "0.9.0"
Expand Down
51 changes: 49 additions & 2 deletions core/src/execution/datafusion/expressions/scalar_funcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
sync::Arc,
};

use crate::execution::datafusion::spark_hash::create_hashes;
use crate::execution::datafusion::spark_hash::{create_murmur3_hashes, create_xxhash64_hashes};
use arrow::{
array::{
ArrayRef, AsArray, Decimal128Builder, Float32Array, Float64Array, GenericStringArray,
Expand Down Expand Up @@ -119,6 +119,10 @@ pub fn create_comet_physical_fun(
let func = Arc::new(spark_murmur3_hash);
make_comet_scalar_udf!("murmur3_hash", func, without data_type)
}
"xxhash64" => {
let func = Arc::new(spark_xxhash64);
make_comet_scalar_udf!("xxhash64", func, without data_type)
}
sha if sha2_functions.contains(&sha) => {
// Spark requires hex string as the result of sha2 functions, we have to wrap the
// result of digest functions as hex string
Expand Down Expand Up @@ -653,7 +657,7 @@ fn spark_murmur3_hash(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusio
}
})
.collect::<Vec<ArrayRef>>();
create_hashes(&arrays, &mut hashes)?;
create_murmur3_hashes(&arrays, &mut hashes)?;
if num_rows == 1 {
Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(
hashes[0] as i32,
Expand All @@ -672,6 +676,49 @@ fn spark_murmur3_hash(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusio
}
}

fn spark_xxhash64(args: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
let length = args.len();
let seed = &args[length - 1];
match seed {
ColumnarValue::Scalar(ScalarValue::Int64(Some(seed))) => {
// iterate over the arguments to find out the length of the array
let num_rows = args[0..args.len() - 1]
.iter()
.find_map(|arg| match arg {
ColumnarValue::Array(array) => Some(array.len()),
ColumnarValue::Scalar(_) => None,
})
.unwrap_or(1);
let mut hashes: Vec<u64> = vec![0_u64; num_rows];
hashes.fill(*seed as u64);
let arrays = args[0..args.len() - 1]
.iter()
.map(|arg| match arg {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => {
scalar.clone().to_array_of_size(num_rows).unwrap()
}
})
.collect::<Vec<ArrayRef>>();
create_xxhash64_hashes(&arrays, &mut hashes)?;
if num_rows == 1 {
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(
hashes[0] as i64,
))))
} else {
let hashes: Vec<i64> = hashes.into_iter().map(|x| x as i64).collect();
Ok(ColumnarValue::Array(Arc::new(Int64Array::from(hashes))))
}
}
_ => {
internal_err!(
"The seed of function xxhash64 must be an Int64 scalar value, but got: {:?}.",
seed
)
}
}
}

#[inline]
fn hex_encode<T: AsRef<[u8]>>(data: T) -> String {
let mut s = String::with_capacity(data.as_ref().len() * 2);
Expand Down
4 changes: 2 additions & 2 deletions core/src/execution/datafusion/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use tokio::task;
use crate::{
common::bit::ceil,
errors::{CometError, CometResult},
execution::datafusion::spark_hash::{create_hashes, pmod},
execution::datafusion::spark_hash::{create_murmur3_hashes, pmod},
};

/// The shuffle writer operator maps each input partition to M output partitions based on a
Expand Down Expand Up @@ -673,7 +673,7 @@ impl ShuffleRepartitioner {

// Hash arrays and compute buckets based on number of partitions
let partition_ids = &mut self.partition_ids[..arrays[0].len()];
create_hashes(&arrays, hashes_buf)?
create_murmur3_hashes(&arrays, hashes_buf)?
.iter()
.enumerate()
.for_each(|(idx, hash)| {
Expand Down
Loading

0 comments on commit c79bd5c

Please sign in to comment.