Skip to content

Commit

Permalink
feat: builder to vector without resetting (#2915)
Browse files Browse the repository at this point in the history
* feat: finish_cloned() without resetting

* test: add unit cases

* chore: port comment

* chore: apply suggestions from code review

Co-authored-by: Yingwen <[email protected]>

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
QuenKar and evenyag authored Dec 13, 2023
1 parent 3555e16 commit fec3fcf
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 4 deletions.
5 changes: 4 additions & 1 deletion src/datatypes/src/scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,11 @@ pub trait ScalarVectorBuilder: MutableVector {
/// Push a value into the builder.
fn push(&mut self, value: Option<<Self::VectorType as ScalarVector>::RefItem<'_>>);

/// Finish build and return a new vector.
/// Build a new vector and reset `self`.
fn finish(&mut self) -> Self::VectorType;

/// Build a new vector without resetting `self`.
fn finish_cloned(&self) -> Self::VectorType;
}

macro_rules! impl_scalar_for_native {
Expand Down
17 changes: 17 additions & 0 deletions src/datatypes/src/vectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ pub trait MutableVector: Send + Sync {
/// Convert `self` to an (immutable) [VectorRef] and reset `self`.
fn to_vector(&mut self) -> VectorRef;

/// Convert `self` to an (immutable) [VectorRef] and without resetting `self`.
fn to_vector_cloned(&self) -> VectorRef;

/// Try to push value ref to this mutable vector.
fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()>;

Expand Down Expand Up @@ -423,4 +426,18 @@ pub mod tests {
// Panic with_capacity
let _ = ListVectorBuilder::with_capacity(1024);
}

#[test]
fn test_mutable_vector_to_vector_cloned() {
// create a string vector builder
let mut builder = ConcreteDataType::string_datatype().create_mutable_vector(1024);
builder.push_value_ref(ValueRef::String("hello"));
builder.push_value_ref(ValueRef::String("world"));
builder.push_value_ref(ValueRef::String("!"));

// use MutableVector trait to_vector_cloned won't reset builder
let vector = builder.to_vector_cloned();
assert_eq!(vector.len(), 3);
assert_eq!(builder.len(), 3);
}
}
27 changes: 27 additions & 0 deletions src/datatypes/src/vectors/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ impl MutableVector for BinaryVectorBuilder {
Arc::new(self.finish())
}

fn to_vector_cloned(&self) -> VectorRef {
Arc::new(self.finish_cloned())
}

fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
match value.as_binary()? {
Some(v) => self.mutable_array.append_value(v),
Expand Down Expand Up @@ -197,6 +201,12 @@ impl ScalarVectorBuilder for BinaryVectorBuilder {
array: self.mutable_array.finish(),
}
}

fn finish_cloned(&self) -> Self::VectorType {
BinaryVector {
array: self.mutable_array.finish_cloned(),
}
}
}

impl Serializable for BinaryVector {
Expand Down Expand Up @@ -348,4 +358,21 @@ mod tests {
let expect: VectorRef = Arc::new(BinaryVector::from_slice(&[b"hello", b"one", b"two"]));
assert_eq!(expect, vector);
}

#[test]
fn test_binary_vector_builder_finish_cloned() {
let mut builder = BinaryVectorBuilder::with_capacity(1024);
builder.push(Some(b"one"));
builder.push(Some(b"two"));
builder.push(Some(b"three"));
let vector = builder.finish_cloned();
assert_eq!(b"one", vector.get_data(0).unwrap());
assert_eq!(vector.len(), 3);
assert_eq!(builder.len(), 3);

builder.push(Some(b"four"));
let vector = builder.finish_cloned();
assert_eq!(b"four", vector.get_data(3).unwrap());
assert_eq!(builder.len(), 4);
}
}
27 changes: 27 additions & 0 deletions src/datatypes/src/vectors/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ impl MutableVector for BooleanVectorBuilder {
Arc::new(self.finish())
}

fn to_vector_cloned(&self) -> VectorRef {
Arc::new(self.finish_cloned())
}

fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
match value.as_boolean()? {
Some(v) => self.mutable_array.append_value(v),
Expand Down Expand Up @@ -213,6 +217,12 @@ impl ScalarVectorBuilder for BooleanVectorBuilder {
array: self.mutable_array.finish(),
}
}

fn finish_cloned(&self) -> Self::VectorType {
BooleanVector {
array: self.mutable_array.finish_cloned(),
}
}
}

impl Serializable for BooleanVector {
Expand Down Expand Up @@ -358,4 +368,21 @@ mod tests {
let expect: VectorRef = Arc::new(BooleanVector::from_slice(&[true, false, true]));
assert_eq!(expect, vector);
}

#[test]
fn test_boolean_vector_builder_finish_cloned() {
let mut builder = BooleanVectorBuilder::with_capacity(1024);
builder.push(Some(true));
builder.push(Some(false));
builder.push(Some(true));
let vector = builder.finish_cloned();
assert!(vector.get_data(0).unwrap());
assert_eq!(vector.len(), 3);
assert_eq!(builder.len(), 3);

builder.push(Some(false));
let vector = builder.finish_cloned();
assert!(!vector.get_data(3).unwrap());
assert_eq!(builder.len(), 4);
}
}
22 changes: 22 additions & 0 deletions src/datatypes/src/vectors/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,10 @@ impl MutableVector for Decimal128VectorBuilder {
Arc::new(self.finish())
}

fn to_vector_cloned(&self) -> VectorRef {
Arc::new(self.finish_cloned())
}

fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
let decimal_val = value.as_decimal128()?.map(|v| v.val());
self.mutable_array.append_option(decimal_val);
Expand Down Expand Up @@ -358,6 +362,12 @@ impl ScalarVectorBuilder for Decimal128VectorBuilder {
array: self.mutable_array.finish(),
}
}

fn finish_cloned(&self) -> Self::VectorType {
Decimal128Vector {
array: self.mutable_array.finish_cloned(),
}
}
}

impl Decimal128VectorBuilder {
Expand Down Expand Up @@ -553,4 +563,16 @@ pub mod tests {
.collect::<Vec<_>>();
assert_eq!(values, vec![2, 4, 6, 8]);
}

#[test]
fn test_decimal128_vector_builder_finish_cloned() {
let mut builder = Decimal128VectorBuilder::with_capacity(1024);
builder.push(Some(Decimal128::new(1, 3, 1)));
builder.push(Some(Decimal128::new(1, 3, 1)));
builder.push(Some(Decimal128::new(1, 3, 1)));
builder.push(Some(Decimal128::new(1, 3, 1)));
let vector = builder.finish_cloned();
assert_eq!(vector.len(), 4);
assert_eq!(builder.len(), 4);
}
}
58 changes: 57 additions & 1 deletion src/datatypes/src/vectors/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::sync::Arc;
use arrow::array::{
Array, ArrayData, ArrayRef, BooleanBufferBuilder, Int32BufferBuilder, ListArray,
};
use arrow::buffer::Buffer;
use arrow::buffer::{Buffer, NullBuffer};
use arrow::datatypes::DataType as ArrowDataType;
use serde_json::Value as JsonValue;

Expand Down Expand Up @@ -281,6 +281,10 @@ impl MutableVector for ListVectorBuilder {
Arc::new(self.finish())
}

fn to_vector_cloned(&self) -> VectorRef {
Arc::new(self.finish_cloned())
}

fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
if let Some(list_ref) = value.as_list()? {
match list_ref {
Expand Down Expand Up @@ -355,6 +359,32 @@ impl ScalarVectorBuilder for ListVectorBuilder {
item_type: self.item_type.clone(),
}
}

// Port from https://github.com/apache/arrow-rs/blob/ef6932f31e243d8545e097569653c8d3f1365b4d/arrow-array/src/builder/generic_list_builder.rs#L302-L325
fn finish_cloned(&self) -> Self::VectorType {
let len = self.len();
let values_vector = self.values_builder.to_vector_cloned();
let values_arr = values_vector.to_arrow_array();
let values_data = values_arr.to_data();

let offset_buffer = Buffer::from_slice_ref(self.offsets_builder.as_slice());
let nulls = self.null_buffer_builder.finish_cloned();

let data_type = ConcreteDataType::list_datatype(self.item_type.clone()).as_arrow_type();
let array_data_builder = ArrayData::builder(data_type)
.len(len)
.add_buffer(offset_buffer)
.add_child_data(values_data)
.nulls(nulls);

let array_data = unsafe { array_data_builder.build_unchecked() };
let array = ListArray::from(array_data);

ListVector {
array,
item_type: self.item_type.clone(),
}
}
}

// Ports from https://github.com/apache/arrow-rs/blob/94565bca99b5d9932a3e9a8e094aaf4e4384b1e5/arrow-array/src/builder/null_buffer_builder.rs
Expand Down Expand Up @@ -427,6 +457,12 @@ impl NullBufferBuilder {
buf
}

/// Builds the [NullBuffer] without resetting the builder.
fn finish_cloned(&self) -> Option<NullBuffer> {
let buffer = self.bitmap_builder.as_ref()?.finish_cloned();
Some(NullBuffer::new(buffer))
}

#[inline]
fn materialize_if_needed(&mut self) {
if self.bitmap_builder.is_none() {
Expand Down Expand Up @@ -728,4 +764,24 @@ pub mod tests {
iter.nth(1).unwrap().unwrap()
);
}

#[test]
fn test_list_vector_builder_finish_cloned() {
let mut builder =
ListVectorBuilder::with_type_capacity(ConcreteDataType::int32_datatype(), 2);
builder.push(None);
builder.push(Some(ListValueRef::Ref {
val: &ListValue::new(
Some(Box::new(vec![
Value::Int32(4),
Value::Null,
Value::Int32(6),
])),
ConcreteDataType::int32_datatype(),
),
}));
let vector = builder.finish_cloned();
assert_eq!(vector.len(), 2);
assert_eq!(builder.len(), 2);
}
}
14 changes: 14 additions & 0 deletions src/datatypes/src/vectors/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ impl MutableVector for NullVectorBuilder {
vector
}

fn to_vector_cloned(&self) -> VectorRef {
Arc::new(NullVector::new(self.length))
}

fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
ensure!(
value.is_null(),
Expand Down Expand Up @@ -275,4 +279,14 @@ mod tests {
let expect: VectorRef = Arc::new(input);
assert_eq!(expect, vector);
}

#[test]
fn test_null_vector_builder_finish_cloned() {
let mut builder = NullType.create_mutable_vector(3);
builder.push_null();
builder.push_null();
let vector = builder.to_vector_cloned();
assert_eq!(vector.len(), 2);
assert_eq!(vector.null_count(), 2);
}
}
36 changes: 34 additions & 2 deletions src/datatypes/src/vectors/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ impl<T: LogicalPrimitiveType> MutableVector for PrimitiveVectorBuilder<T> {
Arc::new(self.finish())
}

fn to_vector_cloned(&self) -> VectorRef {
Arc::new(self.finish_cloned())
}

fn try_push_value_ref(&mut self, value: ValueRef) -> Result<()> {
let primitive = T::cast_value_ref(value)?;
match primitive {
Expand Down Expand Up @@ -352,6 +356,12 @@ where
array: self.mutable_array.finish(),
}
}

fn finish_cloned(&self) -> Self::VectorType {
PrimitiveVector {
array: self.mutable_array.finish_cloned(),
}
}
}

pub(crate) fn replicate_primitive<T: LogicalPrimitiveType>(
Expand Down Expand Up @@ -390,6 +400,8 @@ pub(crate) fn replicate_primitive<T: LogicalPrimitiveType>(

#[cfg(test)]
mod tests {
use std::vec;

use arrow::array::{
Int32Array, Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray,
Time64NanosecondArray,
Expand All @@ -406,13 +418,14 @@ mod tests {
use super::*;
use crate::data_type::DataType;
use crate::serialize::Serializable;
use crate::timestamp::TimestampMillisecond;
use crate::types::Int64Type;
use crate::vectors::{
DurationMicrosecondVector, DurationMillisecondVector, DurationNanosecondVector,
DurationSecondVector, IntervalDayTimeVector, IntervalYearMonthVector,
TimeMicrosecondVector, TimeMillisecondVector, TimeNanosecondVector, TimeSecondVector,
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
TimestampSecondVector,
TimestampMicrosecondVector, TimestampMillisecondVector, TimestampMillisecondVectorBuilder,
TimestampNanosecondVector, TimestampSecondVector,
};

fn check_vec(v: Int32Vector) {
Expand Down Expand Up @@ -682,4 +695,23 @@ mod tests {
vector
);
}

#[test]
fn test_primitive_vector_builder_finish_cloned() {
let mut builder = Int64Type::default().create_mutable_vector(3);
builder.push_value_ref(ValueRef::Int64(123));
builder.push_value_ref(ValueRef::Int64(456));
let vector = builder.to_vector_cloned();
assert_eq!(vector.len(), 2);
assert_eq!(vector.null_count(), 0);
assert_eq!(builder.len(), 2);

let mut builder = TimestampMillisecondVectorBuilder::with_capacity(1024);
builder.push(Some(TimestampMillisecond::new(1)));
builder.push(Some(TimestampMillisecond::new(2)));
builder.push(Some(TimestampMillisecond::new(3)));
let vector = builder.finish_cloned();
assert_eq!(vector.len(), 3);
assert_eq!(builder.len(), 3);
}
}
Loading

0 comments on commit fec3fcf

Please sign in to comment.