Skip to content

Commit

Permalink
Implement Log Replay for Change Data Feed (#540)
Browse files Browse the repository at this point in the history
## What changes are proposed in this pull request?
This PR introduces the path for replaying the log for TableChanges and
resolving cdc, add, and remove actions.

At the top level, we introduce `TableChangesScan::scan_data`, which gets
`TableChangesScanData` used to read CDF. The stream of scan data
requires a log replay.

To perform log replay, the `table_changes::LogReplayScanner` is
introduced, which processes a single commit. It is responsible for two
things:
1. Producing `TableChangesScanData`, which is made up of transformed
`EngineData`, a selection vector, and a map `remove_dvs: HashMap<String,
DvInfo>`. `remove_dvs` maps from a remove action's path to its deletion
vector.
2. The replay scanner also performs schema, protocol, and table property
validation to ensure that the Change Data Feed can be processed.

The `LogReplayScanner` performs two passes over the actions for each
commit in `try_new` and `into_scan_batches` respectively.

To perform the operations above, two new visitors are added:
`PreparePhaseVisitor`, and `FileActionSelectionVisitor`.

To test the changes, a new `LocalMockTable` struct is created for
testing. This struct is used to write batches of actions into commits.
This is used to verify that LogReplay produces correct output.

The physical schema is added to `TableChangesScan`. 

## How was this change tested?
The following cases are tested: 
- Valid metadata and protocol processing
- Failure due to `delta.enableChangeDataFeed` not being enabled.
- Failure due to incompatible schema
- Simple add and remove case where there are no shared paths among the
actions
- A `cdc` action is present and all other actions must be filtered.
- A remove and add action with the same path are resolved: The remove
action is not selected, but it's registered in the `remove_dv` map. The
add action must be selected.
- Failure due to incompatible protocol update.
- Correctly using default timestamp from the file modification time.
- Data skipping works during log replay.
The following schema validation cases are tested:
- Adding non-nullable column
- adding nullable column
- commit has wider type than cdf schema
- type widening (will eventually be supported)
- cdf column is nullable while the commit schema is non-nullable (will
eventually be supported)
- cdf schema and commit schema have completely incompatible types
- cdf schema has an extra nullable column.
  • Loading branch information
OussamaSaoudi-db authored Dec 6, 2024
1 parent eb95c5b commit 3b456e4
Show file tree
Hide file tree
Showing 13 changed files with 1,271 additions and 24 deletions.
4 changes: 4 additions & 0 deletions ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub enum KernelError {
UnsupportedError,
ParseIntervalError,
ChangeDataFeedUnsupported,
ChangeDataFeedIncompatibleSchema,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -104,6 +105,9 @@ impl From<Error> for KernelError {
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::ParseIntervalError(_) => KernelError::ParseIntervalError,
Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported,
Error::ChangeDataFeedIncompatibleSchema(_, _) => {
KernelError::ChangeDataFeedIncompatibleSchema
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::utils::require;
use crate::{DeltaResult, Error, FileSystemClient};

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(serde::Serialize), serde(rename_all = "camelCase"))]
pub struct DeletionVectorDescriptor {
/// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p'].
pub storage_type: String,
Expand Down
22 changes: 21 additions & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Format {
/// Name of the encoding for files in this table
pub provider: String,
Expand All @@ -102,6 +103,7 @@ impl Default for Format {
}

#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Metadata {
/// Unique identifier for this table
pub id: String,
Expand Down Expand Up @@ -287,7 +289,7 @@ impl Protocol {
}

// given unparsed `table_features`, parse and check if they are subset of `supported_features`
fn ensure_supported_features<T>(
pub(crate) fn ensure_supported_features<T>(
table_features: &[String],
supported_features: &HashSet<T>,
) -> DeltaResult<()>
Expand Down Expand Up @@ -325,6 +327,7 @@ where
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
Expand All @@ -346,6 +349,7 @@ struct CommitInfo {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
pub struct Add {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
Expand Down Expand Up @@ -374,23 +378,29 @@ pub struct Add {
/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file.
///
/// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub stats: Option<String>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub tags: Option<HashMap<String, String>>,

/// Information about deletion vector (DV) associated with this add action
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub deletion_vector: Option<DeletionVectorDescriptor>,

/// Default generated Row ID of the first row in the file. The default generated Row IDs
/// of the other rows in the file can be reconstructed by adding the physical index of the
/// row within the file to the base Row ID
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub base_row_id: Option<i64>,

/// First commit version in which an add action with the same path was committed to the table.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub default_row_commit_version: Option<i64>,

/// The name of the clustering implementation
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub clustering_provider: Option<String>,
}

Expand All @@ -403,6 +413,7 @@ impl Add {
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Remove {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
Expand All @@ -412,39 +423,48 @@ struct Remove {
pub(crate) path: String,

/// The time this logical file was created, as milliseconds since the epoch.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_timestamp: Option<i64>,

/// When `false` the logical file must already be present in the table or the records
/// in the added file must be contained in one or more remove actions in the same version.
pub(crate) data_change: bool,

/// When true the fields `partition_values`, `size`, and `tags` are present
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) extended_file_metadata: Option<bool>,

/// A map from partition column to value for this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) partition_values: Option<HashMap<String, String>>,

/// The size of this data file in bytes
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) size: Option<i64>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) tags: Option<HashMap<String, String>>,

/// Information about deletion vector (DV) associated with this add action
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,

/// Default generated Row ID of the first row in the file. The default generated Row IDs
/// of the other rows in the file can be reconstructed by adding the physical index of the
/// row within the file to the base Row ID
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) base_row_id: Option<i64>,

/// First commit version in which an add action with the same path was committed to the table.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) default_row_commit_version: Option<i64>,
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Cdc {
/// A relative path to a change data file from the root of the table or an absolute path to a
/// change data file that should be added to the table. The path is a URI as specified by
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct ProtocolVisitor {

impl ProtocolVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
fn visit_protocol<'a>(
pub(crate) fn visit_protocol<'a>(
row_index: usize,
min_reader_version: i32,
getters: &[&'a dyn GetData<'a>],
Expand Down
11 changes: 10 additions & 1 deletion kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
str::Utf8Error,
};

use crate::schema::DataType;
use crate::schema::{DataType, StructType};
use crate::table_properties::ParseIntervalError;
use crate::Version;

Expand Down Expand Up @@ -188,6 +188,9 @@ pub enum Error {

#[error("Change data feed is unsupported for the table at version {0}")]
ChangeDataFeedUnsupported(Version),

#[error("Change data feed encountered incompatible schema. Expected {0}, got {1}")]
ChangeDataFeedIncompatibleSchema(String, String),
}

// Convenience constructors for Error types that take a String argument
Expand Down Expand Up @@ -254,6 +257,12 @@ impl Error {
pub fn change_data_feed_unsupported(version: impl Into<Version>) -> Self {
Self::ChangeDataFeedUnsupported(version.into())
}
pub(crate) fn change_data_feed_incompatible_schema(
expected: &StructType,
actual: &StructType,
) -> Self {
Self::ChangeDataFeedIncompatibleSchema(format!("{expected:?}"), format!("{actual:?}"))
}

// Capture a backtrace when the error is constructed.
#[must_use]
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const MULTIPART_PART_LEN: usize = 10;
/// The number of characters in the uuid part of a uuid checkpoint
const UUID_PART_LEN: usize = 36;

#[derive(Debug)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
enum LogPathFileType {
Expand All @@ -37,7 +37,7 @@ enum LogPathFileType {
Unknown,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct ParsedLogPath<Location: AsUrl = FileMeta> {
Expand Down
3 changes: 2 additions & 1 deletion kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(||
]))
});

static SCAN_ROW_DATATYPE: LazyLock<DataType> = LazyLock::new(|| SCAN_ROW_SCHEMA.clone().into());
pub(crate) static SCAN_ROW_DATATYPE: LazyLock<DataType> =
LazyLock::new(|| SCAN_ROW_SCHEMA.clone().into());

fn get_add_transform_expr() -> Expression {
Expression::Struct(vec![
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
use self::log_replay::scan_action_iter;
use self::state::GlobalScanState;

mod data_skipping;
pub(crate) mod data_skipping;
pub mod log_replay;
pub mod state;

Expand Down
Loading

0 comments on commit 3b456e4

Please sign in to comment.