Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Dec 9, 2024
1 parent 2731c7e commit 911a0b3
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,37 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::datafusion::expressions::strings::SubstringExpr;
use crate::{
execution::datafusion::util::spark_bloom_filter::SparkBloomFilter, parquet::data_type::AsBytes,
};
use arrow::record_batch::RecordBatch;
use arrow_array::cast::as_primitive_array;
use arrow_schema::{DataType, Schema};
use datafusion::physical_expr_common::physical_expr::DynEq;
use datafusion::physical_expr_common::physical_expr::DynHash;
use datafusion::physical_plan::ColumnarValue;
use datafusion_comet_spark_expr::utils::down_cast_any_ref;
use datafusion_common::{internal_err, Result, ScalarValue};
use datafusion_physical_expr::PhysicalExpr;
use std::{
any::Any,
fmt::Display,
hash::{Hash, Hasher},
sync::Arc,
};
use std::hash::Hasher;
use std::{any::Any, fmt::Display, sync::Arc};

/// A physical expression that checks if a value might be in a bloom filter. It corresponds to the
/// Spark's `BloomFilterMightContain` expression.
#[derive(Debug, Hash)]
#[derive(Debug, Eq)]
pub struct BloomFilterMightContain {
pub bloom_filter_expr: Arc<dyn PhysicalExpr>,
pub value_expr: Arc<dyn PhysicalExpr>,
bloom_filter: Option<SparkBloomFilter>,
}

impl DynEq for BloomFilterMightContain {
fn dyn_eq(&self, other: &dyn Any) -> bool {
impl DynHash for BloomFilterMightContain {
fn dyn_hash(&self, _state: &mut dyn Hasher) {
todo!()
}
}

impl PartialEq for BloomFilterMightContain {
fn eq(&self, other: &Self) -> bool {
fn eq(&self, _other: &Self) -> bool {
todo!()
}
}
Expand Down
24 changes: 11 additions & 13 deletions native/core/src/execution/datafusion/expressions/checkoverflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,43 @@
// specific language governing permissions and limitations
// under the License.

use std::{
any::Any,
fmt::{Display, Formatter},
hash::{Hash, Hasher},
sync::Arc,
};

use crate::execution::datafusion::expressions::strings::SubstringExpr;
use arrow::{
array::{as_primitive_array, Array, ArrayRef, Decimal128Array},
datatypes::{Decimal128Type, DecimalType},
record_batch::RecordBatch,
};
use arrow_schema::{DataType, Schema};
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr_common::physical_expr::DynEq;
use datafusion::physical_expr_common::physical_expr::DynHash;
use datafusion_comet_spark_expr::utils::down_cast_any_ref;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_physical_expr::PhysicalExpr;
use std::hash::Hasher;
use std::{
any::Any,
fmt::{Display, Formatter},
sync::Arc,
};

/// This is from Spark `CheckOverflow` expression. Spark `CheckOverflow` expression rounds decimals
/// to given scale and check if the decimals can fit in given precision. As `cast` kernel rounds
/// decimals already, Comet `CheckOverflow` expression only checks if the decimals can fit in the
/// precision.
#[derive(Debug, Hash)]
#[derive(Debug, Eq)]
pub struct CheckOverflow {
pub child: Arc<dyn PhysicalExpr>,
pub data_type: DataType,
pub fail_on_error: bool,
}

impl DynEq for CheckOverflow {
fn dyn_eq(&self, other: &dyn Any) -> bool {
impl DynHash for CheckOverflow {
fn dyn_hash(&self, _state: &mut dyn Hasher) {
todo!()
}
}

impl PartialEq for CheckOverflow {
fn eq(&self, other: &Self) -> bool {
fn eq(&self, _other: &Self) -> bool {
todo!()
}
}
Expand Down
13 changes: 7 additions & 6 deletions native/core/src/execution/datafusion/expressions/negative.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow::{compute::kernels::numeric::neg_wrapping, datatypes::IntervalDayTimeT
use arrow_array::RecordBatch;
use arrow_buffer::IntervalDayTime;
use arrow_schema::{DataType, Schema};
use datafusion::physical_expr_common::physical_expr::DynEq;
use datafusion::physical_expr_common::physical_expr::DynHash;
use datafusion::{
logical_expr::{interval_arithmetic::Interval, ColumnarValue},
physical_expr::PhysicalExpr,
Expand All @@ -30,7 +30,8 @@ use datafusion_comet_spark_expr::utils::down_cast_any_ref;
use datafusion_comet_spark_expr::SparkError;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::sort_properties::ExprProperties;
use std::{any::Any, hash::Hash, sync::Arc};
use std::hash::Hasher;
use std::{any::Any, sync::Arc};

pub fn create_negate_expr(
expr: Arc<dyn PhysicalExpr>,
Expand All @@ -40,21 +41,21 @@ pub fn create_negate_expr(
}

/// Negative expression
#[derive(Debug, Hash)]
#[derive(Debug, Eq)]
pub struct NegativeExpr {
/// Input expression
arg: Arc<dyn PhysicalExpr>,
fail_on_error: bool,
}

impl DynEq for NegativeExpr {
fn dyn_eq(&self, other: &dyn Any) -> bool {
impl DynHash for NegativeExpr {
fn dyn_hash(&self, _state: &mut dyn Hasher) {
todo!()
}
}

impl PartialEq for NegativeExpr {
fn eq(&self, other: &Self) -> bool {
fn eq(&self, _other: &Self) -> bool {
todo!()
}
}
Expand Down
25 changes: 12 additions & 13 deletions native/core/src/execution/datafusion/expressions/strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

#![allow(deprecated)]

use crate::execution::datafusion::expressions::checkoverflow::CheckOverflow;
use crate::execution::kernels::strings::{string_space, substring};
use arrow::{
compute::{
Expand All @@ -28,18 +27,18 @@ use arrow::{
};
use arrow_schema::{DataType, Schema};
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr_common::physical_expr::DynEq;
use datafusion::physical_expr_common::physical_expr::DynHash;
use datafusion::physical_expr_common::physical_expr::{DynEq, DynHash};
use datafusion_comet_spark_expr::utils::down_cast_any_ref;
use datafusion_comet_spark_expr::ToJson;
use datafusion_common::{DataFusionError, ScalarValue::Utf8};
use datafusion_physical_expr::PhysicalExpr;
use std::hash::Hasher;
use std::{
any::Any,
fmt::{Display, Formatter},
hash::{Hash, Hasher},
hash::Hash,
sync::Arc,
};

macro_rules! make_predicate_function {
($name: ident, $kernel: ident, $str_scalar_kernel: ident) => {
#[derive(Debug, Hash)]
Expand Down Expand Up @@ -155,26 +154,26 @@ make_predicate_function!(EndsWith, ends_with_dyn, ends_with_utf8_scalar_dyn);

make_predicate_function!(Contains, contains_dyn, contains_utf8_scalar_dyn);

#[derive(Debug, Hash)]
#[derive(Debug, Eq)]
pub struct SubstringExpr {
pub child: Arc<dyn PhysicalExpr>,
pub start: i64,
pub len: u64,
}

#[derive(Debug, Hash)]
#[derive(Debug, Eq)]
pub struct StringSpaceExpr {
pub child: Arc<dyn PhysicalExpr>,
}

impl DynEq for StringSpaceExpr {
fn dyn_eq(&self, other: &dyn Any) -> bool {
impl DynHash for StringSpaceExpr {
fn dyn_hash(&self, _state: &mut dyn Hasher) {
todo!()
}
}

impl PartialEq for StringSpaceExpr {
fn eq(&self, other: &Self) -> bool {
fn eq(&self, _other: &Self) -> bool {
todo!()
}
}
Expand Down Expand Up @@ -216,14 +215,14 @@ impl PartialEq<dyn Any> for SubstringExpr {
}
}

impl DynEq for SubstringExpr {
fn dyn_eq(&self, other: &dyn Any) -> bool {
impl DynHash for SubstringExpr {
fn dyn_hash(&self, _state: &mut dyn Hasher) {
todo!()
}
}

impl PartialEq for SubstringExpr {
fn eq(&self, other: &Self) -> bool {
fn eq(&self, _other: &Self) -> bool {
todo!()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use jni::{
use std::{
any::Any,
fmt::{Display, Formatter},
hash::{Hash, Hasher},
hash::Hash,
sync::Arc,
};

Expand Down
6 changes: 1 addition & 5 deletions native/core/src/execution/datafusion/expressions/unbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ use datafusion::physical_plan::ColumnarValue;
use datafusion_comet_spark_expr::utils::down_cast_any_ref;
use datafusion_common::{internal_err, Result};
use datafusion_physical_expr::PhysicalExpr;
use std::{
any::Any,
hash::{Hash, Hasher},
sync::Arc,
};
use std::{any::Any, hash::Hash, sync::Arc};

/// This is similar to `UnKnownColumn` in DataFusion, but it has data type.
/// This is only used when the column is not bound to a schema, for example, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::iter::zip;
/// A simple bit array implementation that simulates the behavior of Spark's BitArray which is
/// used in the BloomFilter implementation. Some methods are not implemented as they are not
/// required for the current use case.
#[derive(Debug, Hash)]
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct SparkBitArray {
data: Vec<u64>,
bit_count: usize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const SPARK_BLOOM_FILTER_VERSION_1: i32 = 1;
/// A Bloom filter implementation that simulates the behavior of Spark's BloomFilter.
/// It's not a complete implementation of Spark's BloomFilter, but just add the minimum
/// methods to support mightContainsLong in the native side.
#[derive(Debug, Hash)]
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct SparkBloomFilter {
bits: SparkBitArray,
num_hash_functions: u32,
Expand Down
9 changes: 4 additions & 5 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
runtime_env::{RuntimeConfig, RuntimeEnv},
},
execution::{disk_manager::DiskManagerConfig, runtime_env::RuntimeEnv},
physical_plan::{display::DisplayableExecutionPlan, SendableRecordBatchStream},
prelude::{SessionConfig, SessionContext},
};
Expand Down Expand Up @@ -52,6 +49,7 @@ use crate::{
};
use datafusion_comet_proto::spark_operator::Operator;
use datafusion_common::ScalarValue;
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use futures::stream::StreamExt;
use jni::{
objects::GlobalRef,
Expand Down Expand Up @@ -176,7 +174,7 @@ fn prepare_datafusion_session_context(
batch_size: usize,
comet_task_memory_manager: Arc<GlobalRef>,
) -> CometResult<SessionContext> {
let mut rt_config = RuntimeConfig::new().with_disk_manager(DiskManagerConfig::NewOs);
let mut rt_config = RuntimeEnvBuilder::new().with_disk_manager(DiskManagerConfig::NewOs);

// Set Comet memory pool for native
let memory_pool = CometMemoryPool::new(comet_task_memory_manager);
Expand All @@ -198,6 +196,7 @@ fn prepare_datafusion_session_context(
&ScalarValue::Float64(Some(1.1)),
);

#[allow(deprecated)]
let runtime = RuntimeEnv::try_new(rt_config)?;

let mut session_ctx = SessionContext::new_with_config_rt(session_config, Arc::new(runtime));
Expand Down
13 changes: 10 additions & 3 deletions native/spark-expr/src/bitwise_not.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
// specific language governing permissions and limitations
// under the License.

use std::{any::Any, hash::Hash, sync::Arc};

use crate::utils::down_cast_any_ref;
use arrow::{
array::*,
datatypes::{DataType, Schema},
record_batch::RecordBatch,
};
use datafusion::physical_expr_common::physical_expr::DynHash;
use datafusion::{error::DataFusionError, logical_expr::ColumnarValue};
use datafusion_common::Result;
use datafusion_physical_expr::PhysicalExpr;
use std::hash::Hasher;
use std::{any::Any, sync::Arc};

macro_rules! compute_op {
($OPERAND:expr, $DT:ident) => {{
Expand All @@ -39,12 +40,18 @@ macro_rules! compute_op {
}

/// BitwiseNot expression
#[derive(Debug, Hash, Eq)]
#[derive(Debug, Eq)]
pub struct BitwiseNotExpr {
/// Input expression
arg: Arc<dyn PhysicalExpr>,
}

impl DynHash for BitwiseNotExpr {
fn dyn_hash(&self, _state: &mut dyn Hasher) {
todo!()
}
}

impl PartialEq for BitwiseNotExpr {
fn eq(&self, other: &Self) -> bool {
self.arg.eq(&other.arg)
Expand Down
Loading

0 comments on commit 911a0b3

Please sign in to comment.