diff --git a/src/datatypes/src/scalars.rs b/src/datatypes/src/scalars.rs index 8539819a439c..60e6da218369 100644 --- a/src/datatypes/src/scalars.rs +++ b/src/datatypes/src/scalars.rs @@ -139,8 +139,11 @@ pub trait ScalarVectorBuilder: MutableVector { /// Push a value into the builder. fn push(&mut self, value: Option<::RefItem<'_>>); - /// Finish build and return a new vector. + /// Build a new vector and reset `self`. fn finish(&mut self) -> Self::VectorType; + + /// Build a new vector without resetting `self`. + fn finish_cloned(&self) -> Self::VectorType; } macro_rules! impl_scalar_for_native { diff --git a/src/datatypes/src/vectors.rs b/src/datatypes/src/vectors.rs index a1dcafac82ec..a0604ee8170c 100644 --- a/src/datatypes/src/vectors.rs +++ b/src/datatypes/src/vectors.rs @@ -191,6 +191,9 @@ pub trait MutableVector: Send + Sync { /// Convert `self` to an (immutable) [VectorRef] and reset `self`. fn to_vector(&mut self) -> VectorRef; + /// Convert `self` to an (immutable) [VectorRef] and without resetting `self`. + fn to_vector_cloned(&self) -> VectorRef; + /// Try to push value ref to this mutable vector. fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()>; @@ -423,4 +426,18 @@ pub mod tests { // Panic with_capacity let _ = ListVectorBuilder::with_capacity(1024); } + + #[test] + fn test_mutable_vector_to_vector_cloned() { + // create a string vector builder + let mut builder = ConcreteDataType::string_datatype().create_mutable_vector(1024); + builder.push_value_ref(ValueRef::String("hello")); + builder.push_value_ref(ValueRef::String("world")); + builder.push_value_ref(ValueRef::String("!")); + + // use MutableVector trait to_vector_cloned won't reset builder + let vector = builder.to_vector_cloned(); + assert_eq!(vector.len(), 3); + assert_eq!(builder.len(), 3); + } } diff --git a/src/datatypes/src/vectors/binary.rs b/src/datatypes/src/vectors/binary.rs index 894dbb62cb4e..36187bd5af7f 100644 --- a/src/datatypes/src/vectors/binary.rs +++ b/src/datatypes/src/vectors/binary.rs @@ -159,6 +159,10 @@ impl MutableVector for BinaryVectorBuilder { Arc::new(self.finish()) } + fn to_vector_cloned(&self) -> VectorRef { + Arc::new(self.finish_cloned()) + } + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { match value.as_binary()? { Some(v) => self.mutable_array.append_value(v), @@ -197,6 +201,12 @@ impl ScalarVectorBuilder for BinaryVectorBuilder { array: self.mutable_array.finish(), } } + + fn finish_cloned(&self) -> Self::VectorType { + BinaryVector { + array: self.mutable_array.finish_cloned(), + } + } } impl Serializable for BinaryVector { @@ -348,4 +358,21 @@ mod tests { let expect: VectorRef = Arc::new(BinaryVector::from_slice(&[b"hello", b"one", b"two"])); assert_eq!(expect, vector); } + + #[test] + fn test_binary_vector_builder_finish_cloned() { + let mut builder = BinaryVectorBuilder::with_capacity(1024); + builder.push(Some(b"one")); + builder.push(Some(b"two")); + builder.push(Some(b"three")); + let vector = builder.finish_cloned(); + assert_eq!(b"one", vector.get_data(0).unwrap()); + assert_eq!(vector.len(), 3); + assert_eq!(builder.len(), 3); + + builder.push(Some(b"four")); + let vector = builder.finish_cloned(); + assert_eq!(b"four", vector.get_data(3).unwrap()); + assert_eq!(builder.len(), 4); + } } diff --git a/src/datatypes/src/vectors/boolean.rs b/src/datatypes/src/vectors/boolean.rs index a87bc853e6ec..8b02fda95d0c 100644 --- a/src/datatypes/src/vectors/boolean.rs +++ b/src/datatypes/src/vectors/boolean.rs @@ -175,6 +175,10 @@ impl MutableVector for BooleanVectorBuilder { Arc::new(self.finish()) } + fn to_vector_cloned(&self) -> VectorRef { + Arc::new(self.finish_cloned()) + } + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { match value.as_boolean()? { Some(v) => self.mutable_array.append_value(v), @@ -213,6 +217,12 @@ impl ScalarVectorBuilder for BooleanVectorBuilder { array: self.mutable_array.finish(), } } + + fn finish_cloned(&self) -> Self::VectorType { + BooleanVector { + array: self.mutable_array.finish_cloned(), + } + } } impl Serializable for BooleanVector { @@ -358,4 +368,21 @@ mod tests { let expect: VectorRef = Arc::new(BooleanVector::from_slice(&[true, false, true])); assert_eq!(expect, vector); } + + #[test] + fn test_boolean_vector_builder_finish_cloned() { + let mut builder = BooleanVectorBuilder::with_capacity(1024); + builder.push(Some(true)); + builder.push(Some(false)); + builder.push(Some(true)); + let vector = builder.finish_cloned(); + assert!(vector.get_data(0).unwrap()); + assert_eq!(vector.len(), 3); + assert_eq!(builder.len(), 3); + + builder.push(Some(false)); + let vector = builder.finish_cloned(); + assert!(!vector.get_data(3).unwrap()); + assert_eq!(builder.len(), 4); + } } diff --git a/src/datatypes/src/vectors/decimal.rs b/src/datatypes/src/vectors/decimal.rs index fab5e74cce75..1daf689c9c93 100644 --- a/src/datatypes/src/vectors/decimal.rs +++ b/src/datatypes/src/vectors/decimal.rs @@ -310,6 +310,10 @@ impl MutableVector for Decimal128VectorBuilder { Arc::new(self.finish()) } + fn to_vector_cloned(&self) -> VectorRef { + Arc::new(self.finish_cloned()) + } + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { let decimal_val = value.as_decimal128()?.map(|v| v.val()); self.mutable_array.append_option(decimal_val); @@ -358,6 +362,12 @@ impl ScalarVectorBuilder for Decimal128VectorBuilder { array: self.mutable_array.finish(), } } + + fn finish_cloned(&self) -> Self::VectorType { + Decimal128Vector { + array: self.mutable_array.finish_cloned(), + } + } } impl Decimal128VectorBuilder { @@ -553,4 +563,16 @@ pub mod tests { .collect::>(); assert_eq!(values, vec![2, 4, 6, 8]); } + + #[test] + fn test_decimal128_vector_builder_finish_cloned() { + let mut builder = Decimal128VectorBuilder::with_capacity(1024); + builder.push(Some(Decimal128::new(1, 3, 1))); + builder.push(Some(Decimal128::new(1, 3, 1))); + builder.push(Some(Decimal128::new(1, 3, 1))); + builder.push(Some(Decimal128::new(1, 3, 1))); + let vector = builder.finish_cloned(); + assert_eq!(vector.len(), 4); + assert_eq!(builder.len(), 4); + } } diff --git a/src/datatypes/src/vectors/list.rs b/src/datatypes/src/vectors/list.rs index 00ca4398fef9..aaf88bbd9866 100644 --- a/src/datatypes/src/vectors/list.rs +++ b/src/datatypes/src/vectors/list.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use arrow::array::{ Array, ArrayData, ArrayRef, BooleanBufferBuilder, Int32BufferBuilder, ListArray, }; -use arrow::buffer::Buffer; +use arrow::buffer::{Buffer, NullBuffer}; use arrow::datatypes::DataType as ArrowDataType; use serde_json::Value as JsonValue; @@ -281,6 +281,10 @@ impl MutableVector for ListVectorBuilder { Arc::new(self.finish()) } + fn to_vector_cloned(&self) -> VectorRef { + Arc::new(self.finish_cloned()) + } + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { if let Some(list_ref) = value.as_list()? { match list_ref { @@ -355,6 +359,32 @@ impl ScalarVectorBuilder for ListVectorBuilder { item_type: self.item_type.clone(), } } + + // Port from https://github.com/apache/arrow-rs/blob/ef6932f31e243d8545e097569653c8d3f1365b4d/arrow-array/src/builder/generic_list_builder.rs#L302-L325 + fn finish_cloned(&self) -> Self::VectorType { + let len = self.len(); + let values_vector = self.values_builder.to_vector_cloned(); + let values_arr = values_vector.to_arrow_array(); + let values_data = values_arr.to_data(); + + let offset_buffer = Buffer::from_slice_ref(self.offsets_builder.as_slice()); + let nulls = self.null_buffer_builder.finish_cloned(); + + let data_type = ConcreteDataType::list_datatype(self.item_type.clone()).as_arrow_type(); + let array_data_builder = ArrayData::builder(data_type) + .len(len) + .add_buffer(offset_buffer) + .add_child_data(values_data) + .nulls(nulls); + + let array_data = unsafe { array_data_builder.build_unchecked() }; + let array = ListArray::from(array_data); + + ListVector { + array, + item_type: self.item_type.clone(), + } + } } // Ports from https://github.com/apache/arrow-rs/blob/94565bca99b5d9932a3e9a8e094aaf4e4384b1e5/arrow-array/src/builder/null_buffer_builder.rs @@ -427,6 +457,12 @@ impl NullBufferBuilder { buf } + /// Builds the [NullBuffer] without resetting the builder. + fn finish_cloned(&self) -> Option { + let buffer = self.bitmap_builder.as_ref()?.finish_cloned(); + Some(NullBuffer::new(buffer)) + } + #[inline] fn materialize_if_needed(&mut self) { if self.bitmap_builder.is_none() { @@ -728,4 +764,24 @@ pub mod tests { iter.nth(1).unwrap().unwrap() ); } + + #[test] + fn test_list_vector_builder_finish_cloned() { + let mut builder = + ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 2); + builder.push(None); + builder.push(Some(ListValueRef::Ref { + val: &ListValue::new( + Some(Box::new(vec![ + Value::Int32(4), + Value::Null, + Value::Int32(6), + ])), + ConcreteDataType::int32_datatype(), + ), + })); + let vector = builder.finish_cloned(); + assert_eq!(vector.len(), 2); + assert_eq!(builder.len(), 2); + } } diff --git a/src/datatypes/src/vectors/null.rs b/src/datatypes/src/vectors/null.rs index 1b1a8d3be7b2..292e2c5e33e3 100644 --- a/src/datatypes/src/vectors/null.rs +++ b/src/datatypes/src/vectors/null.rs @@ -156,6 +156,10 @@ impl MutableVector for NullVectorBuilder { vector } + fn to_vector_cloned(&self) -> VectorRef { + Arc::new(NullVector::new(self.length)) + } + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { ensure!( value.is_null(), @@ -275,4 +279,14 @@ mod tests { let expect: VectorRef = Arc::new(input); assert_eq!(expect, vector); } + + #[test] + fn test_null_vector_builder_finish_cloned() { + let mut builder = NullType.create_mutable_vector(3); + builder.push_null(); + builder.push_null(); + let vector = builder.to_vector_cloned(); + assert_eq!(vector.len(), 2); + assert_eq!(vector.null_count(), 2); + } } diff --git a/src/datatypes/src/vectors/primitive.rs b/src/datatypes/src/vectors/primitive.rs index 61743b800ccd..7e7e3dd50bc6 100644 --- a/src/datatypes/src/vectors/primitive.rs +++ b/src/datatypes/src/vectors/primitive.rs @@ -303,6 +303,10 @@ impl MutableVector for PrimitiveVectorBuilder { Arc::new(self.finish()) } + fn to_vector_cloned(&self) -> VectorRef { + Arc::new(self.finish_cloned()) + } + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { let primitive = T::cast_value_ref(value)?; match primitive { @@ -352,6 +356,12 @@ where array: self.mutable_array.finish(), } } + + fn finish_cloned(&self) -> Self::VectorType { + PrimitiveVector { + array: self.mutable_array.finish_cloned(), + } + } } pub(crate) fn replicate_primitive( @@ -390,6 +400,8 @@ pub(crate) fn replicate_primitive( #[cfg(test)] mod tests { + use std::vec; + use arrow::array::{ Int32Array, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, @@ -406,13 +418,14 @@ mod tests { use super::*; use crate::data_type::DataType; use crate::serialize::Serializable; + use crate::timestamp::TimestampMillisecond; use crate::types::Int64Type; use crate::vectors::{ DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector, DurationSecondVector, IntervalDayTimeVector, IntervalYearMonthVector, TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector, - TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, - TimestampSecondVector, + TimestampMicrosecondVector, TimestampMillisecondVector, TimestampMillisecondVectorBuilder, + TimestampNanosecondVector, TimestampSecondVector, }; fn check_vec(v: Int32Vector) { @@ -682,4 +695,23 @@ mod tests { vector ); } + + #[test] + fn test_primitive_vector_builder_finish_cloned() { + let mut builder = Int64Type::default().create_mutable_vector(3); + builder.push_value_ref(ValueRef::Int64(123)); + builder.push_value_ref(ValueRef::Int64(456)); + let vector = builder.to_vector_cloned(); + assert_eq!(vector.len(), 2); + assert_eq!(vector.null_count(), 0); + assert_eq!(builder.len(), 2); + + let mut builder = TimestampMillisecondVectorBuilder::with_capacity(1024); + builder.push(Some(TimestampMillisecond::new(1))); + builder.push(Some(TimestampMillisecond::new(2))); + builder.push(Some(TimestampMillisecond::new(3))); + let vector = builder.finish_cloned(); + assert_eq!(vector.len(), 3); + assert_eq!(builder.len(), 3); + } } diff --git a/src/datatypes/src/vectors/string.rs b/src/datatypes/src/vectors/string.rs index bf49491eb1d2..18c35eaacecf 100644 --- a/src/datatypes/src/vectors/string.rs +++ b/src/datatypes/src/vectors/string.rs @@ -190,6 +190,10 @@ impl MutableVector for StringVectorBuilder { Arc::new(self.finish()) } + fn to_vector_cloned(&self) -> VectorRef { + Arc::new(self.finish_cloned()) + } + fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> { match value.as_string()? { Some(v) => self.mutable_array.append_value(v), @@ -228,6 +232,12 @@ impl ScalarVectorBuilder for StringVectorBuilder { array: self.mutable_array.finish(), } } + + fn finish_cloned(&self) -> Self::VectorType { + StringVector { + array: self.mutable_array.finish_cloned(), + } + } } impl Serializable for StringVector { @@ -244,6 +254,8 @@ vectors::impl_try_from_arrow_array_for_vector!(StringArray, StringVector); #[cfg(test)] mod tests { + use std::vec; + use arrow::datatypes::DataType; use super::*; @@ -359,4 +371,19 @@ mod tests { let serialized = serde_json::to_string(&vector.serialize_to_json().unwrap()).unwrap(); assert_eq!(r#"["🀀🀀🀀","🀁🀁🀁","🀂🀂🀂","🀃🀃🀃","🀆🀆"]"#, serialized); } + + #[test] + fn test_string_vector_builder_finish_cloned() { + let mut builder = StringVectorBuilder::with_capacity(1024); + builder.push(Some("1")); + builder.push(Some("2")); + builder.push(Some("3")); + let vector = builder.finish_cloned(); + assert_eq!(vector.len(), 3); + assert_eq!( + r#"["1","2","3"]"#, + serde_json::to_string(&vector.serialize_to_json().unwrap()).unwrap(), + ); + assert_eq!(builder.len(), 3); + } }