From e993194c559bf16630809693bcd5f9e635ee6e5a Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 15:17:28 -0800 Subject: [PATCH 01/17] Add table changes constructor --- kernel/src/error.rs | 8 +- kernel/src/lib.rs | 1 + kernel/src/snapshot.rs | 2 +- kernel/src/table.rs | 18 +++ kernel/src/table_changes/mod.rs | 144 ++++++++++++++++++ .../_delta_log/00000000000000000000.json | 4 + .../_delta_log/00000000000000000001.json | 3 + .../_delta_log/00000000000000000002.json | 5 + 8 files changed, 183 insertions(+), 2 deletions(-) create mode 100644 kernel/src/table_changes/mod.rs create mode 100644 kernel/tests/data/table-with-cdf/_delta_log/00000000000000000000.json create mode 100644 kernel/tests/data/table-with-cdf/_delta_log/00000000000000000001.json create mode 100644 kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 99cdba844..626fc76f8 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -6,7 +6,7 @@ use std::{ str::Utf8Error, }; -use crate::schema::DataType; +use crate::{schema::DataType, Version}; /// A [`std::result::Result`] that has the kernel [`Error`] as the error variant pub type DeltaResult = std::result::Result; @@ -179,6 +179,9 @@ pub enum Error { /// Some functionality is currently unsupported #[error("Unsupported: {0}")] Unsupported(String), + + #[error("Table changes disabled as of version {0}")] + TableChangesDisabled(Version), } // Convenience constructors for Error types that take a String argument @@ -242,6 +245,9 @@ impl Error { pub fn unsupported(msg: impl ToString) -> Self { Self::Unsupported(msg.to_string()) } + pub fn table_changes_disabled(version: impl Into) -> Self { + Self::TableChangesDisabled(version.into()) + } // Capture a backtrace when the error is constructed. #[must_use] diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 0dd6f7d37..0142c513a 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -64,6 +64,7 @@ pub mod engine_data; pub mod error; pub mod expressions; pub(crate) mod predicates; +pub mod table_changes; pub mod table_features; #[cfg(feature = "developer-visibility")] diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 6521a33fb..c30c3a2cc 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -50,7 +50,7 @@ impl Snapshot { /// /// # Parameters /// - /// - `location`: url pointing at the table root (where `_delta_log` folder is located) + /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. /// - `version`: target version of the [`Snapshot`] pub fn try_new( diff --git a/kernel/src/table.rs b/kernel/src/table.rs index a459de0f4..78765b298 100644 --- a/kernel/src/table.rs +++ b/kernel/src/table.rs @@ -8,6 +8,7 @@ use std::path::PathBuf; use url::Url; use crate::snapshot::Snapshot; +use crate::table_changes::TableChanges; use crate::transaction::Transaction; use crate::{DeltaResult, Engine, Error, Version}; @@ -80,6 +81,23 @@ impl Table { Snapshot::try_new(self.location.clone(), engine, version) } + /// Create a [`TableChanges`] to get changes made to the table between `start_version`, and `end_version`. + /// + /// If no `end_version` is supplied, the latest version will be used as the `end_version`. + pub fn table_changes( + &self, + engine: &dyn Engine, + start_version: Version, + end_version: impl Into>, + ) -> DeltaResult { + TableChanges::try_new( + self.location.clone(), + engine, + start_version, + end_version.into(), + ) + } + /// Create a new write transaction for this table. pub fn new_transaction(&self, engine: &dyn Engine) -> DeltaResult { Transaction::try_new(self.snapshot(engine, None)?) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs new file mode 100644 index 000000000..fdafa0e22 --- /dev/null +++ b/kernel/src/table_changes/mod.rs @@ -0,0 +1,144 @@ +//! Provides an API to read the table's change data feed between two versions. + +use url::Url; + +use crate::log_segment::LogSegment; +use crate::path::AsUrl; +use crate::schema::Schema; +use crate::snapshot::Snapshot; +use crate::table_features::ColumnMappingMode; +use crate::{DeltaResult, Engine, Error, Version}; + +/// Represents a call to read the Change Data Feed between two versions of a table. The schema of +/// `TableChanges` will be the schema of the table with three additional columns: +/// - `_change_type`: String representing the type of change that for that commit. This may be one +/// of `delete`, `insert`, `update_preimage`, or `update_postimage` +/// - `_commit_version`: Long representing the commit the change occurred in. +/// - `_commit_timestamp`: Time at which the commit occurred. If In-commit timestamps is enabled, +/// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the +/// commit file's modification timestamp. +/// +/// For more details, see the Protocol sections for [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file) and [`ChangeDataFiles`](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file). +/// +/// [`CommitInfo`]: crate::actions::CommitInfo +#[derive(Debug)] +pub struct TableChanges { + pub log_segment: LogSegment, + table_root: Url, + end_snapshot: Snapshot, + start_version: Version, +} + +impl TableChanges { + /// Creates a new [`TableChanges`] instance for the given version range. This function fails if + /// the change data feed table feature is disabled in either the start or end versions. However + /// it does not verify that change data feed is enabled for the entire range. + /// + /// # Parameters + /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) + /// - `engine`: Implementation of [`Engine`] apis. + /// - `start_version`: The start version of the change data feed + /// - `end_version`: The end version of the change data feed. If this is none, this defaults to + /// the newest table version. + pub fn try_new( + table_root: Url, + engine: &dyn Engine, + start_version: Version, + end_version: Option, + ) -> DeltaResult { + let start_snapshot = + Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; + let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; + + // Verify CDF is enabled at the beginning and end of the interval + let is_cdf_enabled = |snapshot: &Snapshot| { + static ENABLE_CDF_FLAG: &str = "delta.enableChangeDataFeed"; + snapshot + .metadata() + .configuration + .get(ENABLE_CDF_FLAG) + .is_some_and(|val| val == "true") + }; + if !is_cdf_enabled(&start_snapshot) { + return Err(Error::table_changes_disabled(start_version)); + } else if !is_cdf_enabled(&end_snapshot) { + return Err(Error::table_changes_disabled(end_snapshot.version())); + } + + let log_root = table_root.join("_delta_log/")?; + let log_segment = LogSegment::for_table_changes( + engine.get_file_system_client().as_ref(), + log_root, + start_version, + end_version, + )?; + + Ok(TableChanges { + table_root, + end_snapshot, + log_segment, + start_version, + }) + } + + pub fn start_version(&self) -> Version { + self.start_version + } + /// The end version of the `TableChanges`. If no end_version was specified in + /// [`TableChanges::try_new`], this is the newest version as of the call to `try_new`. + pub fn end_version(&self) -> Version { + self.log_segment.end_version + } + /// The logical schema of the change data feed. For details on the shape of the schema, see + /// [`TableChanges`]. + pub fn schema(&self) -> &Schema { + self.end_snapshot.schema() + } + pub fn table_root(&self) -> &Url { + &self.table_root + } + #[allow(unused)] + pub(crate) fn partition_columns(&self) -> &Vec { + &self.end_snapshot.metadata().partition_columns + } + #[allow(unused)] + pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode { + &self.end_snapshot.column_mapping_mode + } +} + +#[cfg(test)] +mod tests { + use crate::Error; + use crate::{engine::sync::SyncEngine, Table}; + + #[test] + fn test_enable_cdf_flag() { + let path = "./tests/data/table-with-cdf"; + let engine = Box::new(SyncEngine::new()); + let table = Table::try_from_uri(path).unwrap(); + + let valid_ranges = [(0, Some(1)), (0, Some(0)), (1, Some(1))]; + for (start_version, end_version) in valid_ranges { + let table_changes = table + .table_changes(engine.as_ref(), start_version, end_version) + .unwrap(); + assert_eq!(table_changes.start_version, start_version); + if let Some(end_version) = end_version { + assert_eq!(table_changes.end_version(), end_version); + } + } + + let invalid_ranges = [ + (0, None), + (0, Some(2)), + (1, Some(2)), + (2, None), + (2, Some(2)), + ]; + for (start_version, end_version) in invalid_ranges { + let res = table.table_changes(engine.as_ref(), start_version, end_version); + assert!(matches!(res, Err(Error::TableChangesDisabled(_)))) + } + } +} diff --git a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000000.json b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..09bf8dbb2 --- /dev/null +++ b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1704392842074,"operation":"Manual Update","operationParameters":{},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"95ec924a-6859-4433-8008-6d6b4a0e3ba5"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7, "readerFeatures":[], "writerFeatures":[]}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "true"}}} +{"add":{"path":"fake/path/1","partitionValues":{},"size":1,"modificationTime":1,"dataChange":true}} diff --git a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000001.json b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000001.json new file mode 100644 index 000000000..8f376bd9c --- /dev/null +++ b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000001.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1704392846030,"operation":"Manual Update","operationParameters":{},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"01d40235-c8b4-4f8e-8f19-8c97872217fd"}} +{"cdc":{"path":"fake/path/2","partitionValues":{"partition_foo":"partition_bar"},"size":1,"tags":{"tag_foo":"tag_bar"},"dataChange":false}} +{"remove":{"path":"fake/path/1","deletionTimestamp":100,"dataChange":true}} diff --git a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json new file mode 100644 index 000000000..eef316c84 --- /dev/null +++ b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json @@ -0,0 +1,5 @@ +{"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}} +{"protocol":{"minReaderVersion":3,"minWriterVersion":7, "readerFeatures":[], "writerFeatures":[]}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "false"}}} +{"remove":{"path":"fake/path/1","deletionTimestamp":1704392846603,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":635,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}} From 56ac31c9182a64edb711110ace48ffd0351a7604 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 15:20:07 -0800 Subject: [PATCH 02/17] Unnest imports in test --- kernel/src/table_changes/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index fdafa0e22..9efafe663 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -109,8 +109,9 @@ impl TableChanges { #[cfg(test)] mod tests { + use crate::engine::sync::SyncEngine; use crate::Error; - use crate::{engine::sync::SyncEngine, Table}; + use crate::Table; #[test] fn test_enable_cdf_flag() { From df40d8a5c91cdddd2c82d69f5c4a38557e1dd228 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 15:23:31 -0800 Subject: [PATCH 03/17] Fix doc --- kernel/src/table_changes/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 9efafe663..ec3fa7004 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -18,7 +18,9 @@ use crate::{DeltaResult, Engine, Error, Version}; /// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the /// commit file's modification timestamp. /// -/// For more details, see the Protocol sections for [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file) and [`ChangeDataFiles`](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file). +/// For more details, see the following sections of the protocol: +/// - [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file) +/// - [Change Data Files](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-data-files). /// /// [`CommitInfo`]: crate::actions::CommitInfo #[derive(Debug)] From 170f6f3b108d8faac5b2462c04226d9d79de693d Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 15:30:58 -0800 Subject: [PATCH 04/17] Fix docs --- kernel/src/table_changes/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index ec3fa7004..259fb6adc 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -32,10 +32,13 @@ pub struct TableChanges { } impl TableChanges { - /// Creates a new [`TableChanges`] instance for the given version range. This function fails if - /// the change data feed table feature is disabled in either the start or end versions. However - /// it does not verify that change data feed is enabled for the entire range. + /// Creates a new [`TableChanges`] instance for the given version range. This function checks + /// these two properties: + /// - The change data feed table feature must be enabled in both the start or end versions. + /// - The schema at the start and end versions are the same. /// + /// Note that this does not check that change data feed is enabled for every commit in the + /// range. It also does not check that the schema remains the same for the entire range. /// # Parameters /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. From 4ce9a9dad73b38e78be3ba387feca84565fbd599 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 15:34:13 -0800 Subject: [PATCH 05/17] more docs --- kernel/src/table.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/src/table.rs b/kernel/src/table.rs index 78765b298..97e1596d7 100644 --- a/kernel/src/table.rs +++ b/kernel/src/table.rs @@ -81,9 +81,9 @@ impl Table { Snapshot::try_new(self.location.clone(), engine, version) } - /// Create a [`TableChanges`] to get changes made to the table between `start_version`, and `end_version`. - /// - /// If no `end_version` is supplied, the latest version will be used as the `end_version`. + /// Create a [`TableChanges`] to get a change data feed for the table between `start_version`, + /// and `end_version`. If no `end_version` is supplied, the latest version will be used as the + /// `end_version`. pub fn table_changes( &self, engine: &dyn Engine, From 4524e752070c66e707e05af7c7c3c202b27477d6 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 15:35:00 -0800 Subject: [PATCH 06/17] Fix nested import in error --- kernel/src/error.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 626fc76f8..fbe07f800 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -6,7 +6,8 @@ use std::{ str::Utf8Error, }; -use crate::{schema::DataType, Version}; +use crate::schema::DataType; +use crate::Version; /// A [`std::result::Result`] that has the kernel [`Error`] as the error variant pub type DeltaResult = std::result::Result; From 23962b64a3d4eb6983137222ac3b8082408ca5d6 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 16:10:50 -0800 Subject: [PATCH 07/17] Add test for schema, asserting it is equal --- kernel/src/table_changes/mod.rs | 35 +++++++++++-------- .../_delta_log/00000000000000000002.json | 1 - .../_delta_log/00000000000000000003.json | 3 ++ .../_delta_log/00000000000000000004.json | 3 ++ 4 files changed, 27 insertions(+), 15 deletions(-) create mode 100644 kernel/tests/data/table-with-cdf/_delta_log/00000000000000000003.json create mode 100644 kernel/tests/data/table-with-cdf/_delta_log/00000000000000000004.json diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 259fb6adc..9c89107db 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -12,7 +12,7 @@ use crate::{DeltaResult, Engine, Error, Version}; /// Represents a call to read the Change Data Feed between two versions of a table. The schema of /// `TableChanges` will be the schema of the table with three additional columns: /// - `_change_type`: String representing the type of change that for that commit. This may be one -/// of `delete`, `insert`, `update_preimage`, or `update_postimage` +/// of `delete`, `insert`, `update_preimage`, or `update_postimage`. /// - `_commit_version`: Long representing the commit the change occurred in. /// - `_commit_timestamp`: Time at which the commit occurred. If In-commit timestamps is enabled, /// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the @@ -39,6 +39,7 @@ impl TableChanges { /// /// Note that this does not check that change data feed is enabled for every commit in the /// range. It also does not check that the schema remains the same for the entire range. + /// /// # Parameters /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. @@ -69,6 +70,11 @@ impl TableChanges { } else if !is_cdf_enabled(&end_snapshot) { return Err(Error::table_changes_disabled(end_snapshot.version())); } + if start_snapshot.schema() != end_snapshot.schema() { + return Err(Error::generic( + "Failed to build TableChanges: Start and end version schemas are different.", + )); + } let log_root = table_root.join("_delta_log/")?; let log_segment = LogSegment::for_table_changes( @@ -85,12 +91,11 @@ impl TableChanges { start_version, }) } - pub fn start_version(&self) -> Version { self.start_version } /// The end version of the `TableChanges`. If no end_version was specified in - /// [`TableChanges::try_new`], this is the newest version as of the call to `try_new`. + /// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`. pub fn end_version(&self) -> Version { self.log_segment.end_version } @@ -124,27 +129,29 @@ mod tests { let engine = Box::new(SyncEngine::new()); let table = Table::try_from_uri(path).unwrap(); - let valid_ranges = [(0, Some(1)), (0, Some(0)), (1, Some(1))]; + let valid_ranges = [(0, 1), (0, 0), (1, 1)]; for (start_version, end_version) in valid_ranges { let table_changes = table .table_changes(engine.as_ref(), start_version, end_version) .unwrap(); assert_eq!(table_changes.start_version, start_version); - if let Some(end_version) = end_version { - assert_eq!(table_changes.end_version(), end_version); - } + assert_eq!(table_changes.end_version(), end_version); } - let invalid_ranges = [ - (0, None), - (0, Some(2)), - (1, Some(2)), - (2, None), - (2, Some(2)), - ]; + let invalid_ranges = [(0, Some(2)), (1, Some(2)), (2, Some(2))]; for (start_version, end_version) in invalid_ranges { let res = table.table_changes(engine.as_ref(), start_version, end_version); assert!(matches!(res, Err(Error::TableChangesDisabled(_)))) } } + #[test] + fn test_schema_evolution_fails() { + let path = "./tests/data/table-with-cdf"; + let engine = Box::new(SyncEngine::new()); + let table = Table::try_from_uri(path).unwrap(); + + // A field in the schema goes from being nullable to non-nullable + let table_changes_res = table.table_changes(engine.as_ref(), 3, 4); + assert!(matches!(table_changes_res, Err(Error::Generic(_)))); + } } diff --git a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json index eef316c84..a85f485a1 100644 --- a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json +++ b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000002.json @@ -1,5 +1,4 @@ {"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}} -{"protocol":{"minReaderVersion":3,"minWriterVersion":7, "readerFeatures":[], "writerFeatures":[]}} {"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "false"}}} {"remove":{"path":"fake/path/1","deletionTimestamp":1704392846603,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":635,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}} {"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}} diff --git a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000003.json b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000003.json new file mode 100644 index 000000000..ffc70d1c9 --- /dev/null +++ b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000003.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "true"}}} +{"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}} diff --git a/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000004.json b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000004.json new file mode 100644 index 000000000..960e6d26f --- /dev/null +++ b/kernel/tests/data/table-with-cdf/_delta_log/00000000000000000004.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1704392846603,"operation":"Manual Update","operationParameters":{},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0-SNAPSHOT","txnId":"6cef7579-ca93-4427-988e-9269e8db50c7"}} +{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"part\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableChangeDataFeed": "true"}}} +{"txn":{"appId":"fakeAppId","version":3,"lastUpdated":200}} From 8d43bc5adf2087005558402d98ba2c8545944ca1 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 16:15:37 -0800 Subject: [PATCH 08/17] fix failing test --- ffi/src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index d2cedb720..3ea816b0d 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -383,6 +383,7 @@ pub enum KernelError { FileAlreadyExists, MissingCommitInfo, UnsupportedError, + TableChangesDisabled, } impl From for KernelError { @@ -435,6 +436,7 @@ impl From for KernelError { Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists, Error::MissingCommitInfo => KernelError::MissingCommitInfo, Error::Unsupported(_) => KernelError::UnsupportedError, + Error::TableChangesDisabled(_) => KernelError::TableChangesDisabled, } } } From 073ae49b7c2fc2291c65dbf4fcc901837051439d Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 18 Nov 2024 17:44:55 -0800 Subject: [PATCH 09/17] Add table changes schema --- kernel/src/table_changes/mod.rs | 43 +++++++++++++++++++++++++++++++-- 1 file changed, 41 insertions(+), 2 deletions(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 9c89107db..ed6cd1a41 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -1,14 +1,23 @@ //! Provides an API to read the table's change data feed between two versions. +use std::sync::LazyLock; use url::Url; use crate::log_segment::LogSegment; use crate::path::AsUrl; -use crate::schema::Schema; +use crate::schema::{DataType, Schema, StructField, StructType}; use crate::snapshot::Snapshot; use crate::table_features::ColumnMappingMode; use crate::{DeltaResult, Engine, Error, Version}; +static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { + [ + StructField::new("_change_type", DataType::STRING, false), + StructField::new("_commit_version", DataType::LONG, false), + StructField::new("_commit_timestamp", DataType::TIMESTAMP, false), + ] +}); + /// Represents a call to read the Change Data Feed between two versions of a table. The schema of /// `TableChanges` will be the schema of the table with three additional columns: /// - `_change_type`: String representing the type of change that for that commit. This may be one @@ -29,6 +38,7 @@ pub struct TableChanges { table_root: Url, end_snapshot: Snapshot, start_version: Version, + schema: Schema, } impl TableChanges { @@ -84,11 +94,20 @@ impl TableChanges { end_version, )?; + let schema = StructType::new( + end_snapshot + .schema() + .fields() + .cloned() + .chain(CDF_FIELDS.clone()), + ); + Ok(TableChanges { table_root, end_snapshot, log_segment, start_version, + schema, }) } pub fn start_version(&self) -> Version { @@ -102,7 +121,7 @@ impl TableChanges { /// The logical schema of the change data feed. For details on the shape of the schema, see /// [`TableChanges`]. pub fn schema(&self) -> &Schema { - self.end_snapshot.schema() + &self.schema } pub fn table_root(&self) -> &Url { &self.table_root @@ -120,8 +139,12 @@ impl TableChanges { #[cfg(test)] mod tests { use crate::engine::sync::SyncEngine; + use crate::schema::DataType; + use crate::schema::StructField; + use crate::table_changes::CDF_FIELDS; use crate::Error; use crate::Table; + use itertools::assert_equal; #[test] fn test_enable_cdf_flag() { @@ -154,4 +177,20 @@ mod tests { let table_changes_res = table.table_changes(engine.as_ref(), 3, 4); assert!(matches!(table_changes_res, Err(Error::Generic(_)))); } + + #[test] + fn test_table_changes_has_cdf_schema() { + let path = "./tests/data/table-with-cdf"; + let engine = Box::new(SyncEngine::new()); + let table = Table::try_from_uri(path).unwrap(); + let expected_schema = [ + StructField::new("part", DataType::INTEGER, true), + StructField::new("id", DataType::INTEGER, true), + ] + .into_iter() + .chain(CDF_FIELDS.clone()); + + let table_changes = table.table_changes(engine.as_ref(), 0, 0).unwrap(); + assert_equal(expected_schema, table_changes.schema().fields().cloned()); + } } From 6d972a6b30fdbbf806f287bcf380b5d5ce74dcdb Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 09:49:10 -0800 Subject: [PATCH 10/17] add schema to error message --- kernel/src/table_changes/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index ed6cd1a41..c12054683 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -81,9 +81,9 @@ impl TableChanges { return Err(Error::table_changes_disabled(end_snapshot.version())); } if start_snapshot.schema() != end_snapshot.schema() { - return Err(Error::generic( - "Failed to build TableChanges: Start and end version schemas are different.", - )); + return Err(Error::generic(format!( + "Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(), + ))); } let log_root = table_root.join("_delta_log/")?; From d3eb161e56813c997924174082e3d6d7678fd943 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 17:09:26 -0800 Subject: [PATCH 11/17] Address pr comments --- kernel/src/error.rs | 8 +++---- kernel/src/table_changes/mod.rs | 37 ++++++++++++++++++++++----------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/kernel/src/error.rs b/kernel/src/error.rs index fbe07f800..38a853c0f 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -181,8 +181,8 @@ pub enum Error { #[error("Unsupported: {0}")] Unsupported(String), - #[error("Table changes disabled as of version {0}")] - TableChangesDisabled(Version), + #[error("Change data feed is unsupported for the table at version {0}")] + ChangeDataFeedUnsupported(Version), } // Convenience constructors for Error types that take a String argument @@ -246,8 +246,8 @@ impl Error { pub fn unsupported(msg: impl ToString) -> Self { Self::Unsupported(msg.to_string()) } - pub fn table_changes_disabled(version: impl Into) -> Self { - Self::TableChangesDisabled(version.into()) + pub fn change_data_feed_unsupported(version: impl Into) -> Self { + Self::ChangeDataFeedUnsupported(version.into()) } // Capture a backtrace when the error is constructed. diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index c12054683..5382dd500 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -54,19 +54,24 @@ impl TableChanges { /// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) /// - `engine`: Implementation of [`Engine`] apis. /// - `start_version`: The start version of the change data feed - /// - `end_version`: The end version of the change data feed. If this is none, this defaults to - /// the newest table version. + /// - `end_version`: The end version (inclusive) of the change data feed. If this is none, this + /// defaults to the newest table version. pub fn try_new( table_root: Url, engine: &dyn Engine, start_version: Version, end_version: Option, ) -> DeltaResult { + // Both snapshots ensure that reading is supported at the start and end version using + // [`Protocol::ensure_read_supported`]. Note that we must still verify that reading is + // supported for every intermediary protocol actions + // [`Protocol`]: crate::actions::Protocol let start_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; - // Verify CDF is enabled at the beginning and end of the interval + // Verify CDF is enabled at the beginning and end of the interval. We must still check every + // intermediary metadata action in the range. let is_cdf_enabled = |snapshot: &Snapshot| { static ENABLE_CDF_FLAG: &str = "delta.enableChangeDataFeed"; snapshot @@ -76,10 +81,11 @@ impl TableChanges { .is_some_and(|val| val == "true") }; if !is_cdf_enabled(&start_snapshot) { - return Err(Error::table_changes_disabled(start_version)); + return Err(Error::change_data_feed_unsupported(start_version)); } else if !is_cdf_enabled(&end_snapshot) { - return Err(Error::table_changes_disabled(end_snapshot.version())); + return Err(Error::change_data_feed_unsupported(end_snapshot.version())); } + if start_snapshot.schema() != end_snapshot.schema() { return Err(Error::generic(format!( "Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(), @@ -110,10 +116,12 @@ impl TableChanges { schema, }) } + + /// The start version of the `TableChanges`. pub fn start_version(&self) -> Version { self.start_version } - /// The end version of the `TableChanges`. If no end_version was specified in + /// The end version (inclusive) of the [`TableChanges`]. If no `end_version` was specified in /// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`. pub fn end_version(&self) -> Version { self.log_segment.end_version @@ -123,13 +131,16 @@ impl TableChanges { pub fn schema(&self) -> &Schema { &self.schema } + /// Path to the root of the table that is being read. pub fn table_root(&self) -> &Url { &self.table_root } + /// The partition columns that will be read. #[allow(unused)] pub(crate) fn partition_columns(&self) -> &Vec { &self.end_snapshot.metadata().partition_columns } + /// The column mapping mode at the end schema. #[allow(unused)] pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode { &self.end_snapshot.column_mapping_mode @@ -147,7 +158,8 @@ mod tests { use itertools::assert_equal; #[test] - fn test_enable_cdf_flag() { + fn table_changes_checks_enable_cdf_flag() { + // Table with CDF enabled, then disabled at 2 and enabled at 3 let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); let table = Table::try_from_uri(path).unwrap(); @@ -161,25 +173,26 @@ mod tests { assert_eq!(table_changes.end_version(), end_version); } - let invalid_ranges = [(0, Some(2)), (1, Some(2)), (2, Some(2))]; + let invalid_ranges = [(0, 2), (1, 2), (2, 2), (2, 3)]; for (start_version, end_version) in invalid_ranges { let res = table.table_changes(engine.as_ref(), start_version, end_version); - assert!(matches!(res, Err(Error::TableChangesDisabled(_)))) + assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))) } } #[test] - fn test_schema_evolution_fails() { + fn schema_evolution_fails() { let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); let table = Table::try_from_uri(path).unwrap(); + let expected_msg = "Failed to build TableChanges: Start and end version schemas are different. Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }"; // A field in the schema goes from being nullable to non-nullable let table_changes_res = table.table_changes(engine.as_ref(), 3, 4); - assert!(matches!(table_changes_res, Err(Error::Generic(_)))); + assert!(matches!(table_changes_res, Err(Error::Generic(msg)) if msg == expected_msg)); } #[test] - fn test_table_changes_has_cdf_schema() { + fn table_changes_has_cdf_schema() { let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); let table = Table::try_from_uri(path).unwrap(); From b1e1358a92e61ac2ccba2a1692f8934e7c8e2c7a Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 17:39:04 -0800 Subject: [PATCH 12/17] Add docs and doctest --- kernel/src/table_changes/mod.rs | 38 ++++-- kernel/src/table_changes/scan.rs | 203 +++++++++++++++++++++++++++++++ 2 files changed, 232 insertions(+), 9 deletions(-) create mode 100644 kernel/src/table_changes/scan.rs diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 5382dd500..0b86c106b 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -18,8 +18,8 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { ] }); -/// Represents a call to read the Change Data Feed between two versions of a table. The schema of -/// `TableChanges` will be the schema of the table with three additional columns: +/// Represents a call to read the Change Data Feed (CDF) between two versions of a table. The schema of +/// `TableChanges` will be the schema of the table at the end verios with three additional columns: /// - `_change_type`: String representing the type of change that for that commit. This may be one /// of `delete`, `insert`, `update_preimage`, or `update_postimage`. /// - `_commit_version`: Long representing the commit the change occurred in. @@ -27,11 +27,31 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { /// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the /// commit file's modification timestamp. /// +/// Three properties must hold for the entire CDF range: +/// - Reading must be supported for every commit in the range. This is determined using [`Protocol::ensure_read_supported`] +/// - Change Data Feed must be enabled for the entire range with the `delta.enableChangeDataFeed` +/// table property set to 'true'. +/// - The schema for each commit must be compatible with the end schema. This means that all the +/// same fields and their nullability are the same. Schema compatiblity will be expanded in the +/// future to allow compatible schemas that are not the exact same. See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) +/// +/// # Examples +/// Get `TableChanges` for versions 0 to 1 (inclusive) +/// ```rust +/// # use delta_kernel::engine::sync::SyncEngine; +/// # use delta_kernel::{Table, Error}; +/// # let engine = Box::new(SyncEngine::new()); +/// # let path = "./tests/data/table-with-cdf"; +/// let table = Table::try_from_uri(path).unwrap(); +/// let table_changes = table.table_changes(engine.as_ref(), 0, 1)?; +/// # Ok::<(), Error>(()) +/// ```` /// For more details, see the following sections of the protocol: /// - [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file) /// - [Change Data Files](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-data-files). /// /// [`CommitInfo`]: crate::actions::CommitInfo +/// [`Protocol`]: crate::actions::Protocol #[derive(Debug)] pub struct TableChanges { pub log_segment: LogSegment, @@ -64,14 +84,14 @@ impl TableChanges { ) -> DeltaResult { // Both snapshots ensure that reading is supported at the start and end version using // [`Protocol::ensure_read_supported`]. Note that we must still verify that reading is - // supported for every intermediary protocol actions + // supported for every protocol action in the CDF range. // [`Protocol`]: crate::actions::Protocol let start_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; - // Verify CDF is enabled at the beginning and end of the interval. We must still check every - // intermediary metadata action in the range. + // Verify CDF is enabled at the beginning and end of the interval to fail early. We must + // still check that CDF is enabled for every metadata action in the CDF range. let is_cdf_enabled = |snapshot: &Snapshot| { static ENABLE_CDF_FLAG: &str = "delta.enableChangeDataFeed"; snapshot @@ -86,6 +106,8 @@ impl TableChanges { return Err(Error::change_data_feed_unsupported(end_snapshot.version())); } + // Verify that the start and end schemas are compatible. We must still check schema + // compatibility for each schema update in the CDF range. if start_snapshot.schema() != end_snapshot.schema() { return Err(Error::generic(format!( "Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(), @@ -150,11 +172,9 @@ impl TableChanges { #[cfg(test)] mod tests { use crate::engine::sync::SyncEngine; - use crate::schema::DataType; - use crate::schema::StructField; + use crate::schema::{DataType, StructField}; use crate::table_changes::CDF_FIELDS; - use crate::Error; - use crate::Table; + use crate::{Error, Table}; use itertools::assert_equal; #[test] diff --git a/kernel/src/table_changes/scan.rs b/kernel/src/table_changes/scan.rs new file mode 100644 index 000000000..a8668077c --- /dev/null +++ b/kernel/src/table_changes/scan.rs @@ -0,0 +1,203 @@ +use std::sync::Arc; + +use itertools::Itertools; +use tracing::debug; + +use crate::scan::ColumnType; +use crate::schema::SchemaRef; +use crate::{DeltaResult, ExpressionRef}; + +use super::{TableChanges, CDF_FIELDS}; + +/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change +/// data feed from the table +#[allow(unused)] +pub struct TableChangesScan { + table_changes: Arc, + logical_schema: SchemaRef, + predicate: Option, + all_fields: Vec, + have_partition_cols: bool, +} + +/// Builder to read the `TableChanges` of a table. +pub struct TableChangesScanBuilder { + table_changes: Arc, + schema: Option, + predicate: Option, +} + +impl TableChangesScanBuilder { + /// Create a new [`TableChangesScanBuilder`] instance. + pub fn new(table_changes: impl Into>) -> Self { + Self { + table_changes: table_changes.into(), + schema: None, + predicate: None, + } + } + + /// Provide [`Schema`] for columns to select from the [`TableChanges`]. + /// + /// A table with columns `[a, b, c]` could have a scan which reads only the first + /// two columns by using the schema `[a, b]`. + /// + /// [`Schema`]: crate::schema::Schema + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } + + /// Optionally provide a [`SchemaRef`] for columns to select from the [`TableChanges`]. See + /// [`TableChangesScanBuilder::with_schema`] for details. If `schema_opt` is `None` this is a no-op. + pub fn with_schema_opt(self, schema_opt: Option) -> Self { + match schema_opt { + Some(schema) => self.with_schema(schema), + None => self, + } + } + + /// Optionally provide an expression to filter rows. For example, using the predicate `x < + /// 4` to return a subset of the rows in the scan which satisfy the filter. If `predicate_opt` + /// is `None`, this is a no-op. + /// + /// NOTE: The filtering is best-effort and can produce false positives (rows that should should + /// have been filtered out but were kept). + pub fn with_predicate(mut self, predicate: impl Into>) -> Self { + self.predicate = predicate.into(); + self + } + + /// Build the [`TableChangesScan`]. + /// + /// This does not scan the table at this point, but does do some work to ensure that the + /// provided schema make sense, and to prepare some metadata that the scan will need. The + /// [`TableChangesScan`] type itself can be used to fetch the files and associated metadata required to + /// perform actual data reads. + pub fn build(self) -> DeltaResult { + // if no schema is provided, use snapshot's entire schema (e.g. SELECT *) + let logical_schema = self + .schema + .unwrap_or(self.table_changes.schema.clone().into()); + let mut have_partition_cols = false; + let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); + + // Loop over all selected fields and note if they are columns that will be read from the + // parquet file ([`ColumnType::Selected`]) or if they are partition columns and will need to + // be filled in by evaluating an expression ([`ColumnType::Partition`]) + println!("Logical schema: {:?}", logical_schema); + let column_types = logical_schema + .fields() + .enumerate() + .map(|(index, logical_field)| -> DeltaResult<_> { + if self + .table_changes + .partition_columns() + .contains(logical_field.name()) + { + // Store the index into the schema for this field. When we turn it into an + // expression in the inner loop, we will index into the schema and get the name and + // data type, which we need to properly materialize the column. + have_partition_cols = true; + Ok(ColumnType::Partition(index)) + } else if CDF_FIELDS + .iter() + .any(|field| field.name() == logical_field.name()) + { + // CDF Columns are generated, so they do not have a column mapping. + Ok(ColumnType::Selected(logical_field.name().to_string())) + } else { + // Add to read schema, store field so we can build a `Column` expression later + // if needed (i.e. if we have partition columns) + let physical_field = + logical_field.make_physical(*self.table_changes.column_mapping_mode())?; + debug!("\n\n{logical_field:#?}\nAfter mapping: {physical_field:#?}\n\n"); + let physical_name = physical_field.name.clone(); + read_fields.push(physical_field); + Ok(ColumnType::Selected(physical_name)) + } + }) + .try_collect()?; + Ok(TableChangesScan { + table_changes: self.table_changes, + logical_schema, + predicate: self.predicate, + all_fields: column_types, + have_partition_cols, + }) + } +} +#[cfg(test)] +mod tests { + + use std::sync::Arc; + + use crate::engine::sync::SyncEngine; + use crate::expressions::{column_expr, Scalar}; + use crate::scan::ColumnType; + use crate::schema::{DataType, StructField, StructType}; + use crate::{Expression, Table}; + + #[test] + fn simple_table_changes_scan_builder() { + let path = "./tests/data/table-with-cdf"; + let engine = Box::new(SyncEngine::new()); + let table = Table::try_from_uri(path).unwrap(); + + // A field in the schema goes from being nullable to non-nullable + let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap(); + + let scan = table_changes.into_scan_builder().build().unwrap(); + assert_eq!( + scan.all_fields, + vec![ + ColumnType::Selected("part".to_string()), + ColumnType::Selected("id".to_string()), + ColumnType::Selected("_change_type".to_string()), + ColumnType::Selected("_commit_version".to_string()), + ColumnType::Selected("_commit_timestamp".to_string()), + ] + ); + assert_eq!(scan.predicate, None); + assert!(!scan.have_partition_cols); + } + + #[test] + fn projected_and_filtered_table_changes_scan_builder() { + let path = "./tests/data/table-with-cdf"; + let engine = Box::new(SyncEngine::new()); + let table = Table::try_from_uri(path).unwrap(); + + // A field in the schema goes from being nullable to non-nullable + let table_changes = table.table_changes(engine.as_ref(), 0, 1).unwrap(); + + let schema = table_changes + .schema() + .project(&["id", "_commit_version"]) + .unwrap(); + let predicate = Arc::new(Expression::gt(column_expr!("id"), Scalar::from(10))); + let scan = table_changes + .into_scan_builder() + .with_schema(schema) + .with_predicate(predicate.clone()) + .build() + .unwrap(); + assert_eq!( + scan.all_fields, + vec![ + ColumnType::Selected("id".to_string()), + ColumnType::Selected("_commit_version".to_string()), + ] + ); + assert_eq!( + scan.logical_schema, + StructType::new([ + StructField::new("id", DataType::INTEGER, true), + StructField::new("_commit_version", DataType::LONG, false), + ]) + .into() + ); + assert!(!scan.have_partition_cols); + assert_eq!(scan.predicate, Some(predicate)); + } +} From 6739f09eacc4bb7cb602b6bb3c791dd018af95ef Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 17:41:06 -0800 Subject: [PATCH 13/17] Make logsegment pubcrate --- kernel/src/table_changes/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 0b86c106b..70836e4c6 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -54,7 +54,7 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { /// [`Protocol`]: crate::actions::Protocol #[derive(Debug)] pub struct TableChanges { - pub log_segment: LogSegment, + pub(crate) log_segment: LogSegment, table_root: Url, end_snapshot: Snapshot, start_version: Version, From d2e004b3485f5474a7631dccc9a97fc5bfe4b222 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 17:52:04 -0800 Subject: [PATCH 14/17] address more nits --- ffi/src/lib.rs | 4 ++-- kernel/src/table_changes/mod.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/ffi/src/lib.rs b/ffi/src/lib.rs index 3ea816b0d..7f6054da5 100644 --- a/ffi/src/lib.rs +++ b/ffi/src/lib.rs @@ -383,7 +383,7 @@ pub enum KernelError { FileAlreadyExists, MissingCommitInfo, UnsupportedError, - TableChangesDisabled, + ChangeDataFeedUnsupported, } impl From for KernelError { @@ -436,7 +436,7 @@ impl From for KernelError { Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists, Error::MissingCommitInfo => KernelError::MissingCommitInfo, Error::Unsupported(_) => KernelError::UnsupportedError, - Error::TableChangesDisabled(_) => KernelError::TableChangesDisabled, + Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported, } } } diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 70836e4c6..da28a55fe 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -32,7 +32,7 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { /// - Change Data Feed must be enabled for the entire range with the `delta.enableChangeDataFeed` /// table property set to 'true'. /// - The schema for each commit must be compatible with the end schema. This means that all the -/// same fields and their nullability are the same. Schema compatiblity will be expanded in the +/// same fields and their nullability are the same. Schema compatibility will be expanded in the /// future to allow compatible schemas that are not the exact same. See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) /// /// # Examples @@ -179,7 +179,7 @@ mod tests { #[test] fn table_changes_checks_enable_cdf_flag() { - // Table with CDF enabled, then disabled at 2 and enabled at 3 + // Table with CDF enabled, then disabled at version 2 and enabled at version 3 let path = "./tests/data/table-with-cdf"; let engine = Box::new(SyncEngine::new()); let table = Table::try_from_uri(path).unwrap(); From e1347da90d323b1a508574897272f8b8ac282e59 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 18:03:25 -0800 Subject: [PATCH 15/17] fix doc string reference --- kernel/src/table_changes/mod.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index da28a55fe..270e1c893 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -28,13 +28,15 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { /// commit file's modification timestamp. /// /// Three properties must hold for the entire CDF range: -/// - Reading must be supported for every commit in the range. This is determined using [`Protocol::ensure_read_supported`] +/// - Reading must be supported for every commit in the range. This is determined using [`ensure_read_supported`] /// - Change Data Feed must be enabled for the entire range with the `delta.enableChangeDataFeed` /// table property set to 'true'. /// - The schema for each commit must be compatible with the end schema. This means that all the /// same fields and their nullability are the same. Schema compatibility will be expanded in the /// future to allow compatible schemas that are not the exact same. See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) /// +/// [`CommitInfo`]: crate::actions::CommitInfo +/// [`ensure_read_supported`]: crate::actions::Protocol::ensure_read_supported /// # Examples /// Get `TableChanges` for versions 0 to 1 (inclusive) /// ```rust @@ -49,9 +51,6 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { /// For more details, see the following sections of the protocol: /// - [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file) /// - [Change Data Files](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-data-files). -/// -/// [`CommitInfo`]: crate::actions::CommitInfo -/// [`Protocol`]: crate::actions::Protocol #[derive(Debug)] pub struct TableChanges { pub(crate) log_segment: LogSegment, @@ -83,9 +82,8 @@ impl TableChanges { end_version: Option, ) -> DeltaResult { // Both snapshots ensure that reading is supported at the start and end version using - // [`Protocol::ensure_read_supported`]. Note that we must still verify that reading is + // Protocol::ensure_read_supported`. Note that we must still verify that reading is // supported for every protocol action in the CDF range. - // [`Protocol`]: crate::actions::Protocol let start_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; From 87d41c4e87a738f7f6a7341b02c8de8f31383831 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 18:04:28 -0800 Subject: [PATCH 16/17] more do fix --- kernel/src/table_changes/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 270e1c893..7ccc6c16b 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -82,7 +82,7 @@ impl TableChanges { end_version: Option, ) -> DeltaResult { // Both snapshots ensure that reading is supported at the start and end version using - // Protocol::ensure_read_supported`. Note that we must still verify that reading is + // `ensure_read_supported`. Note that we must still verify that reading is // supported for every protocol action in the CDF range. let start_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; From 96a4b71b47a0d96ae3585ea2e17f3369b56292fd Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Thu, 21 Nov 2024 18:09:50 -0800 Subject: [PATCH 17/17] Added link to issue for schema compatiblity --- kernel/src/table_changes/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kernel/src/table_changes/mod.rs b/kernel/src/table_changes/mod.rs index 7ccc6c16b..af0096c9b 100644 --- a/kernel/src/table_changes/mod.rs +++ b/kernel/src/table_changes/mod.rs @@ -33,7 +33,8 @@ static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { /// table property set to 'true'. /// - The schema for each commit must be compatible with the end schema. This means that all the /// same fields and their nullability are the same. Schema compatibility will be expanded in the -/// future to allow compatible schemas that are not the exact same. See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) +/// future to allow compatible schemas that are not the exact same. +/// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) /// /// [`CommitInfo`]: crate::actions::CommitInfo /// [`ensure_read_supported`]: crate::actions::Protocol::ensure_read_supported @@ -106,6 +107,9 @@ impl TableChanges { // Verify that the start and end schemas are compatible. We must still check schema // compatibility for each schema update in the CDF range. + // Note: Schema compatibility check will be changed in the future to be more flexible. + // See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) + if start_snapshot.schema() != end_snapshot.schema() { return Err(Error::generic(format!( "Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(),