Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Log Replay for Change Data Feed #540

Merged
merged 60 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
e900d43
initial log replay
OussamaSaoudi-db Nov 25, 2024
a446d9a
Add basic mock table for testing
OussamaSaoudi-db Nov 25, 2024
9b77cd2
Finish testing framework for commit actions
OussamaSaoudi-db Nov 25, 2024
3a4c7cd
Fix deletion vectors
OussamaSaoudi-db Nov 25, 2024
9345cfa
Add protocol test
OussamaSaoudi-db Nov 26, 2024
e2fe217
Make MockTable async
OussamaSaoudi-db Nov 26, 2024
d37ebc8
add schema check
OussamaSaoudi-db Nov 26, 2024
4c1a616
Add config flag parsing
OussamaSaoudi-db Nov 26, 2024
4bd3ab0
Add configuration check
OussamaSaoudi-db Nov 26, 2024
748fd45
Change log replay to work with table changes scan
OussamaSaoudi-db Nov 26, 2024
20230ec
add timestamp tests
OussamaSaoudi-db Nov 26, 2024
6778d29
Use map_ok
OussamaSaoudi-db Nov 26, 2024
25d1486
Address some pr comments
OussamaSaoudi-db Nov 26, 2024
d946deb
Integrate with table changes builder
OussamaSaoudi-db Nov 26, 2024
095628a
Fix private visit_protocol, remove print
OussamaSaoudi-db Nov 26, 2024
49fba0e
Change selection vector computation
OussamaSaoudi-db Nov 26, 2024
2b260cc
Add comments for log replay
OussamaSaoudi-db Nov 26, 2024
63121f0
more documentation
OussamaSaoudi-db Nov 26, 2024
e45a2ea
Add file-level doc
OussamaSaoudi-db Nov 26, 2024
75dec49
Revert "Add file-level doc"
OussamaSaoudi-db Nov 26, 2024
261b3f3
Add file level doc
OussamaSaoudi-db Nov 26, 2024
67c88ae
Move common utils to utils::test_utils
OussamaSaoudi-db Nov 29, 2024
fc73579
Refactor to prepare for scan_file, remove unused annotation
OussamaSaoudi-db Nov 29, 2024
0140a28
some documentation
OussamaSaoudi-db Dec 2, 2024
b45c75c
Change log replay api
OussamaSaoudi-db Dec 2, 2024
8129e29
Address more comments
OussamaSaoudi-db Dec 3, 2024
f91e9a0
Move log replay tests to separate file
OussamaSaoudi-db Dec 3, 2024
0a6030b
Move prepare phase into separate function
OussamaSaoudi-db Dec 3, 2024
759d149
Add unused annotation
OussamaSaoudi-db Dec 3, 2024
d3d061b
More pr comments
OussamaSaoudi-db Dec 3, 2024
4b6106f
Address more pr comments
OussamaSaoudi-db Dec 3, 2024
5f644f0
remove type annotation
OussamaSaoudi-db Dec 3, 2024
fd38490
address pr comments
OussamaSaoudi-db Dec 3, 2024
91b9f92
address more pr comments
OussamaSaoudi-db Dec 3, 2024
3b2c3d3
add comments ,revert iterator change
OussamaSaoudi-db Dec 3, 2024
3cf6955
Fix doc comment
OussamaSaoudi-db Dec 3, 2024
bf0d566
Share the data skipping filter
OussamaSaoudi-db Dec 3, 2024
e9423c4
PreparePhaseVisitor no longer has a new function
OussamaSaoudi-db Dec 3, 2024
49921ef
fix clippy error
OussamaSaoudi-db Dec 3, 2024
549cbdc
Add in more doc comments explaining data skipping filter is missing i…
OussamaSaoudi-db Dec 4, 2024
e05cedb
Add data change filtering
OussamaSaoudi-db Dec 4, 2024
6a8eceb
add tests for failing table properties and protocol
OussamaSaoudi-db Dec 4, 2024
5731e63
Remove ICT support
OussamaSaoudi-db Dec 4, 2024
8628165
Fix naming, fix clippy
OussamaSaoudi-db Dec 4, 2024
f1b6ef9
Add link to issue for ICT support
OussamaSaoudi-db Dec 4, 2024
28db41d
Add documentation for DVs
OussamaSaoudi-db Dec 4, 2024
8b4aadb
appease clippy for row visitor
OussamaSaoudi-db Dec 4, 2024
64e0a25
Add source for dvs
OussamaSaoudi-db Dec 4, 2024
7fcc4dc
Address more pr comments
OussamaSaoudi-db Dec 5, 2024
16a549c
Add comment to type widening
OussamaSaoudi-db Dec 5, 2024
e1d922b
Implement has_cdc_file optimization to skip add and remove checks if …
OussamaSaoudi-db Dec 5, 2024
5533389
Shorten tests
OussamaSaoudi-db Dec 5, 2024
562d65f
remove scan file that snuck in
OussamaSaoudi-db Dec 5, 2024
3440577
Add doc comments, add an extra schema incompatibility test
OussamaSaoudi-db Dec 5, 2024
5fd3d1c
Address pr comments
OussamaSaoudi-db Dec 6, 2024
d0eb811
Change tests to use higher level api
OussamaSaoudi-db Dec 6, 2024
7108fd8
Update comments
OussamaSaoudi-db Dec 6, 2024
1e4e09a
Add data skipping filter test
OussamaSaoudi-db Dec 6, 2024
54382aa
Fix import issue
OussamaSaoudi-db Dec 6, 2024
c1d6103
Add unwrap so that test doesn't capture and check the wrong error
OussamaSaoudi-db Dec 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ffi/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub enum KernelError {
UnsupportedError,
ParseIntervalError,
ChangeDataFeedUnsupported,
ChangeDataFeedIncompatibleSchema,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -104,6 +105,9 @@ impl From<Error> for KernelError {
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::ParseIntervalError(_) => KernelError::ParseIntervalError,
Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported,
Error::ChangeDataFeedIncompatibleSchema(_, _) => {
KernelError::ChangeDataFeedIncompatibleSchema
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions kernel/src/actions/deletion_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::utils::require;
use crate::{DeltaResult, Error, FileSystemClient};

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(serde::Serialize), serde(rename_all = "camelCase"))]
pub struct DeletionVectorDescriptor {
/// A single character to indicate how to access the DV. Legal options are: ['u', 'i', 'p'].
pub storage_type: String,
Expand Down
22 changes: 21 additions & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Format {
/// Name of the encoding for files in this table
pub provider: String,
Expand All @@ -102,6 +103,7 @@ impl Default for Format {
}

#[derive(Debug, Default, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize), serde(rename_all = "camelCase"))]
pub struct Metadata {
/// Unique identifier for this table
pub id: String,
Expand Down Expand Up @@ -287,7 +289,7 @@ impl Protocol {
}

// given unparsed `table_features`, parse and check if they are subset of `supported_features`
fn ensure_supported_features<T>(
pub(crate) fn ensure_supported_features<T>(
table_features: &[String],
supported_features: &HashSet<T>,
) -> DeltaResult<()>
Expand Down Expand Up @@ -325,6 +327,7 @@ where
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct CommitInfo {
/// The time this logical file was created, as milliseconds since the epoch.
/// Read: optional, write: required (that is, kernel always writes).
Expand All @@ -346,6 +349,7 @@ struct CommitInfo {
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
pub struct Add {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
Expand Down Expand Up @@ -374,23 +378,29 @@ pub struct Add {
/// Contains [statistics] (e.g., count, min/max values for columns) about the data in this logical file.
///
/// [statistics]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#Per-file-Statistics
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub stats: Option<String>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub tags: Option<HashMap<String, String>>,

/// Information about deletion vector (DV) associated with this add action
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub deletion_vector: Option<DeletionVectorDescriptor>,

/// Default generated Row ID of the first row in the file. The default generated Row IDs
/// of the other rows in the file can be reconstructed by adding the physical index of the
/// row within the file to the base Row ID
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub base_row_id: Option<i64>,

/// First commit version in which an add action with the same path was committed to the table.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub default_row_commit_version: Option<i64>,

/// The name of the clustering implementation
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub clustering_provider: Option<String>,
}

Expand All @@ -403,6 +413,7 @@ impl Add {
#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Remove {
/// A relative path to a data file from the root of the table or an absolute path to a file
/// that should be added to the table. The path is a URI as specified by
Expand All @@ -412,39 +423,48 @@ struct Remove {
pub(crate) path: String,

/// The time this logical file was created, as milliseconds since the epoch.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_timestamp: Option<i64>,

/// When `false` the logical file must already be present in the table or the records
/// in the added file must be contained in one or more remove actions in the same version.
pub(crate) data_change: bool,

/// When true the fields `partition_values`, `size`, and `tags` are present
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) extended_file_metadata: Option<bool>,

/// A map from partition column to value for this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) partition_values: Option<HashMap<String, String>>,

/// The size of this data file in bytes
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) size: Option<i64>,

/// Map containing metadata about this logical file.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) tags: Option<HashMap<String, String>>,

/// Information about deletion vector (DV) associated with this add action
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,

/// Default generated Row ID of the first row in the file. The default generated Row IDs
/// of the other rows in the file can be reconstructed by adding the physical index of the
/// row within the file to the base Row ID
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) base_row_id: Option<i64>,

/// First commit version in which an add action with the same path was committed to the table.
#[cfg_attr(test, serde(skip_serializing_if = "Option::is_none"))]
pub(crate) default_row_commit_version: Option<i64>,
}

#[derive(Debug, Clone, PartialEq, Eq, Schema)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Cdc {
/// A relative path to a change data file from the root of the table or an absolute path to a
/// change data file that should be added to the table. The path is a URI as specified by
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ struct ProtocolVisitor {

impl ProtocolVisitor {
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
fn visit_protocol<'a>(
pub(crate) fn visit_protocol<'a>(
row_index: usize,
min_reader_version: i32,
getters: &[&'a dyn GetData<'a>],
Expand Down
11 changes: 10 additions & 1 deletion kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
str::Utf8Error,
};

use crate::schema::DataType;
use crate::schema::{DataType, StructType};
use crate::table_properties::ParseIntervalError;
use crate::Version;

Expand Down Expand Up @@ -188,6 +188,9 @@ pub enum Error {

#[error("Change data feed is unsupported for the table at version {0}")]
ChangeDataFeedUnsupported(Version),

#[error("Change data feed encountered incompatible schema. Expected {0}, got {1}")]
ChangeDataFeedIncompatibleSchema(String, String),
}

// Convenience constructors for Error types that take a String argument
Expand Down Expand Up @@ -254,6 +257,12 @@ impl Error {
pub fn change_data_feed_unsupported(version: impl Into<Version>) -> Self {
Self::ChangeDataFeedUnsupported(version.into())
}
pub(crate) fn change_data_feed_incompatible_schema(
expected: &StructType,
actual: &StructType,
) -> Self {
Self::ChangeDataFeedIncompatibleSchema(format!("{expected:?}"), format!("{actual:?}"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another case where we may want to filter the schemas to show only their symmetric differences, to avoid drowning in details?

Copy link
Collaborator

@scovich scovich Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case it wasn't obvious: This would be tracked as future work along with the other -- not for this PR.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, submitted this #564 to keep track.

}

// Capture a backtrace when the error is constructed.
#[must_use]
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const MULTIPART_PART_LEN: usize = 10;
/// The number of characters in the uuid part of a uuid checkpoint
const UUID_PART_LEN: usize = 36;

#[derive(Debug)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
enum LogPathFileType {
Expand All @@ -37,7 +37,7 @@ enum LogPathFileType {
Unknown,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))]
struct ParsedLogPath<Location: AsUrl = FileMeta> {
Expand Down
3 changes: 2 additions & 1 deletion kernel/src/scan/log_replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(||
]))
});

static SCAN_ROW_DATATYPE: LazyLock<DataType> = LazyLock::new(|| SCAN_ROW_SCHEMA.clone().into());
pub(crate) static SCAN_ROW_DATATYPE: LazyLock<DataType> =
LazyLock::new(|| SCAN_ROW_SCHEMA.clone().into());

fn get_add_transform_expr() -> Expression {
Expression::Struct(vec![
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
use self::log_replay::scan_action_iter;
use self::state::GlobalScanState;

mod data_skipping;
pub(crate) mod data_skipping;
pub mod log_replay;
pub mod state;

Expand Down
Loading
Loading