Skip to content

Commit

Permalink
refactor(iceberg): support position delete merge on read with hash jo…
Browse files Browse the repository at this point in the history
…in (#19656)
  • Loading branch information
xxhZs authored Jan 3, 2025
1 parent 6248415 commit 972d570
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 288 deletions.
1 change: 1 addition & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ message IcebergScanNode {
ICEBERG_SCAN_TYPE_UNSPECIFIED = 0;
ICEBERG_SCAN_TYPE_DATA_SCAN = 1;
ICEBERG_SCAN_TYPE_EQUALITY_DELETE_SCAN = 2;
ICEBERG_SCAN_TYPE_POSITION_DELETE_SCAN = 3;
}
repeated plan_common.ColumnCatalog columns = 1;
map<string, string> with_properties = 2;
Expand Down
213 changes: 60 additions & 153 deletions src/batch/executors/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use core::ops::BitAnd;
use std::collections::HashMap;
use std::sync::Arc;

use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use iceberg::table::Table;
use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::array::{ArrayImpl, DataChunk, I64Array};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{Field, Schema, ICEBERG_SEQUENCE_NUM_COLUMN_NAME};
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ScalarRefImpl};
use risingwave_common::array::{ArrayImpl, DataChunk, I64Array, Utf8Array};
use risingwave_common::catalog::{
Field, Schema, ICEBERG_FILE_PATH_COLUMN_NAME, ICEBERG_SEQUENCE_NUM_COLUMN_NAME,
};
use risingwave_common::types::DataType;
use risingwave_common_estimate_size::EstimateSize;
use risingwave_connector::source::iceberg::{
IcebergFileScanTaskJsonStrEnum, IcebergProperties, IcebergSplit,
Expand All @@ -41,34 +38,28 @@ use crate::error::BatchError;
use crate::executor::Executor;
use crate::monitor::BatchMetrics;

static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0;
static POSITION_DELETE_FILE_POS: usize = 1;

pub enum IcebergFileScanTaskEnum {
// The scan task of the data file and the position delete file
DataAndPositionDelete(Vec<FileScanTask>, Vec<FileScanTask>),
Data(Vec<FileScanTask>),
// The scan task of the equality delete file
EqualityDelete(Vec<FileScanTask>),
// The scan task of the position delete file
PositionDelete(Vec<FileScanTask>),
}

impl IcebergFileScanTaskEnum {
fn from_iceberg_file_scan_task_json_str_enum(
iceberg_file_scan_task_json_str_enum: IcebergFileScanTaskJsonStrEnum,
) -> Self {
match iceberg_file_scan_task_json_str_enum {
IcebergFileScanTaskJsonStrEnum::DataAndPositionDelete(
data_file_scan_tasks,
position_delete_file_scan_tasks,
) => IcebergFileScanTaskEnum::DataAndPositionDelete(
data_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
position_delete_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
),
IcebergFileScanTaskJsonStrEnum::Data(data_file_scan_tasks) => {
IcebergFileScanTaskEnum::Data(
data_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
)
}
IcebergFileScanTaskJsonStrEnum::EqualityDelete(equality_delete_file_scan_tasks) => {
IcebergFileScanTaskEnum::EqualityDelete(
equality_delete_file_scan_tasks
Expand All @@ -77,6 +68,14 @@ impl IcebergFileScanTaskEnum {
.collect(),
)
}
IcebergFileScanTaskJsonStrEnum::PositionDelete(position_delete_file_scan_tasks) => {
IcebergFileScanTaskEnum::PositionDelete(
position_delete_file_scan_tasks
.into_iter()
.map(|t| t.deserialize())
.collect(),
)
}
}
}
}
Expand All @@ -92,6 +91,7 @@ pub struct IcebergScanExecutor {
identity: String,
metrics: Option<BatchMetrics>,
need_seq_num: bool,
need_file_path_and_pos: bool,
}

impl Executor for IcebergScanExecutor {
Expand Down Expand Up @@ -119,6 +119,7 @@ impl IcebergScanExecutor {
identity: String,
metrics: Option<BatchMetrics>,
need_seq_num: bool,
need_file_path_and_pos: bool,
) -> Self {
Self {
iceberg_config,
Expand All @@ -130,6 +131,7 @@ impl IcebergScanExecutor {
identity,
metrics,
need_seq_num,
need_file_path_and_pos,
}
}

Expand All @@ -142,30 +144,18 @@ impl IcebergScanExecutor {
let data_types = self.schema.data_types();
let table_name = table.identifier().name().to_owned();

let (mut position_delete_filter, data_file_scan_tasks) =
match Option::take(&mut self.file_scan_tasks) {
Some(IcebergFileScanTaskEnum::DataAndPositionDelete(
data_file_scan_tasks,
position_delete_file_scan_tasks,
)) => (
Some(
PositionDeleteFilter::new(
position_delete_file_scan_tasks,
&data_file_scan_tasks,
&table,
self.batch_size,
)
.await?,
),
data_file_scan_tasks,
),
Some(IcebergFileScanTaskEnum::EqualityDelete(equality_delete_file_scan_tasks)) => {
(None, equality_delete_file_scan_tasks)
}
None => {
bail!("file_scan_tasks must be Some")
}
};
let data_file_scan_tasks = match Option::take(&mut self.file_scan_tasks) {
Some(IcebergFileScanTaskEnum::Data(data_file_scan_tasks)) => data_file_scan_tasks,
Some(IcebergFileScanTaskEnum::EqualityDelete(equality_delete_file_scan_tasks)) => {
equality_delete_file_scan_tasks
}
Some(IcebergFileScanTaskEnum::PositionDelete(position_delete_file_scan_tasks)) => {
position_delete_file_scan_tasks
}
None => {
bail!("file_scan_tasks must be Some")
}
};

let mut read_bytes = 0;
let _metrics_report_guard = scopeguard::guard(
Expand Down Expand Up @@ -197,28 +187,30 @@ impl IcebergScanExecutor {
let record_batch = record_batch?;

// iceberg_t1_source
let mut chunk = if self.need_seq_num {
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
let mut chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
if self.need_seq_num {
let (mut columns, visibility) = chunk.into_parts();
columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
vec![data_sequence_number; visibility.len()],
))));
DataChunk::from_parts(columns.into(), visibility)
} else {
IcebergArrowConvert.chunk_from_record_batch(&record_batch)?
chunk = DataChunk::from_parts(columns.into(), visibility)
};

// position delete
if let Some(position_delete_filter) = &mut position_delete_filter {
chunk = position_delete_filter.filter(&data_file_path, chunk, index);
if self.need_file_path_and_pos {
let (mut columns, visibility) = chunk.into_parts();
columns.push(Arc::new(ArrayImpl::Utf8(Utf8Array::from_iter(
vec![data_file_path.as_str(); visibility.len()],
))));
let index_start = (index * self.batch_size) as i64;
columns.push(Arc::new(ArrayImpl::Int64(I64Array::from_iter(
(index_start..(index_start + visibility.len() as i64))
.collect::<Vec<i64>>(),
))));
chunk = DataChunk::from_parts(columns.into(), visibility)
}
assert_eq!(chunk.data_types(), data_types);
read_bytes += chunk.estimated_heap_size() as u64;
yield chunk;
}
if let Some(position_delete_filter) = &mut position_delete_filter {
position_delete_filter.remove_file_path(&data_file_path);
}
}
}
}
Expand Down Expand Up @@ -275,6 +267,12 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
.fields()
.iter()
.any(|f| f.name == ICEBERG_SEQUENCE_NUM_COLUMN_NAME);
let need_file_path_and_pos = schema
.fields()
.iter()
.any(|f| f.name == ICEBERG_FILE_PATH_COLUMN_NAME)
&& matches!(split.files, IcebergFileScanTaskJsonStrEnum::Data(_));

Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties,
Some(split.snapshot_id),
Expand All @@ -285,101 +283,10 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
source.plan_node().get_identity().clone(),
metrics,
need_seq_num,
need_file_path_and_pos,
)))
} else {
unreachable!()
}
}
}

struct PositionDeleteFilter {
// Delete columns pos for each file path, false means this column needs to be deleted, value is divided by batch size
position_delete_file_path_pos_map: HashMap<String, HashMap<usize, Vec<bool>>>,
}

impl PositionDeleteFilter {
async fn new(
position_delete_file_scan_tasks: Vec<FileScanTask>,
data_file_scan_tasks: &[FileScanTask],
table: &Table,
batch_size: usize,
) -> crate::error::Result<Self> {
let mut position_delete_file_path_pos_map: HashMap<String, HashMap<usize, Vec<bool>>> =
HashMap::default();
let data_file_path_set = data_file_scan_tasks
.iter()
.map(|data_file_scan_task| data_file_scan_task.data_file_path.as_ref())
.collect::<std::collections::HashSet<_>>();

let position_delete_file_scan_stream = {
#[try_stream]
async move {
for position_delete_file_scan_task in position_delete_file_scan_tasks {
yield position_delete_file_scan_task;
}
}
};

let reader = table.reader_builder().with_batch_size(batch_size).build();

let mut record_batch_stream = reader
.read(Box::pin(position_delete_file_scan_stream))
.await?;

while let Some(record_batch) = record_batch_stream.next().await {
let record_batch = record_batch?;
let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?;
for row in chunk.rows() {
// The schema is fixed. `0` must be `file_path`, `1` must be `pos`.
if let Some(ScalarRefImpl::Utf8(file_path)) =
row.datum_at(POSITION_DELETE_FILE_FILE_PATH_INDEX)
&& let Some(ScalarRefImpl::Int64(pos)) = row.datum_at(POSITION_DELETE_FILE_POS)
{
if !data_file_path_set.contains(file_path) {
continue;
}
let entry = position_delete_file_path_pos_map
.entry(file_path.to_owned())
.or_default();
// Split `pos` by `batch_size`, because the data file will also be split by `batch_size`
let delete_vec_index = pos as usize / batch_size;
let delete_vec_pos = pos as usize % batch_size;
let delete_vec = entry
.entry(delete_vec_index)
.or_insert(vec![true; batch_size]);
delete_vec[delete_vec_pos] = false;
} else {
bail!("position delete `file_path` and `pos` must be string and int64")
}
}
}
Ok(Self {
position_delete_file_path_pos_map,
})
}

fn filter(&self, data_file_path: &str, mut chunk: DataChunk, index: usize) -> DataChunk {
chunk = chunk.compact();
if let Some(position_delete_bool_iter) = self
.position_delete_file_path_pos_map
.get(data_file_path)
.and_then(|delete_vecs| delete_vecs.get(&index))
{
// Some chunks are less than `batch_size`, so we need to be truncate to ensure that the bitmap length is consistent
let position_delete_bool_iter = if position_delete_bool_iter.len() > chunk.capacity() {
&position_delete_bool_iter[..chunk.capacity()]
} else {
position_delete_bool_iter
};
let new_visibility = Bitmap::from_bool_slice(position_delete_bool_iter);
chunk.set_visibility(chunk.visibility().bitand(new_visibility));
chunk
} else {
chunk
}
}

fn remove_file_path(&mut self, file_path: &str) {
self.position_delete_file_path_pos_map.remove(file_path);
}
}
18 changes: 16 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use risingwave_pb::plan_common::{

use super::schema::FieldLike;
use super::{
iceberg_sequence_num_column_desc, row_id_column_desc, rw_timestamp_column_desc,
USER_COLUMN_ID_OFFSET,
iceberg_file_path_column_desc, iceberg_file_pos_column_desc, iceberg_sequence_num_column_desc,
row_id_column_desc, rw_timestamp_column_desc, USER_COLUMN_ID_OFFSET,
};
use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID};
use crate::types::{DataType, StructType};
Expand Down Expand Up @@ -497,6 +497,20 @@ impl ColumnCatalog {
}
}

pub fn iceberg_file_path_column() -> Self {
Self {
column_desc: iceberg_file_path_column_desc(),
is_hidden: true,
}
}

pub fn iceberg_file_pos_column() -> Self {
Self {
column_desc: iceberg_file_pos_column_desc(),
is_hidden: true,
}
}

pub fn cdc_table_name_column() -> Self {
Self {
column_desc: cdc_table_name_column_desc(),
Expand Down
18 changes: 18 additions & 0 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ pub fn rw_timestamp_column_desc() -> ColumnDesc {

pub const OFFSET_COLUMN_NAME: &str = "_rw_offset";
pub const ICEBERG_SEQUENCE_NUM_COLUMN_NAME: &str = "_iceberg_sequence_number";
pub const ICEBERG_FILE_PATH_COLUMN_NAME: &str = "_iceberg_file_path";
pub const ICEBERG_FILE_POS_COLUMN_NAME: &str = "_iceberg_file_pos";

// The number of columns output by the cdc source job
// see `debezium_cdc_source_schema()` for details
Expand Down Expand Up @@ -171,6 +173,22 @@ pub fn iceberg_sequence_num_column_desc() -> ColumnDesc {
)
}

pub fn iceberg_file_path_column_desc() -> ColumnDesc {
ColumnDesc::named(
ICEBERG_FILE_PATH_COLUMN_NAME,
ColumnId::placeholder(),
DataType::Varchar,
)
}

pub fn iceberg_file_pos_column_desc() -> ColumnDesc {
ColumnDesc::named(
ICEBERG_FILE_POS_COLUMN_NAME,
ColumnId::placeholder(),
DataType::Int64,
)
}

/// The local system catalog reader in the frontend node.
pub trait SysCatalogReader: Sync + Send + 'static {
/// Reads the data of the system catalog table.
Expand Down
Loading

0 comments on commit 972d570

Please sign in to comment.