Skip to content

Commit

Permalink
Add table changes constructor (#505)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md
2. Run `cargo t --all-features --all-targets` to get started testing,
and run `cargo fmt`.
  3. Ensure you have added or run the appropriate tests for your PR.
4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  5. Be sure to keep the PR description updated to reflect all changes.
-->

## What changes are proposed in this pull request?
<!--
Please clarify what changes you are proposing and why the changes are
needed.
The purpose of this section is to outline the changes, why they are
needed, and how this PR fixes the issue.
If the reason for the change is already explained clearly in an issue,
then it does not need to be restated here.
1. If you propose a new API or feature, clarify the use case for a new
API or feature.
  2. If you fix a bug, you can clarify why it is a bug.
-->
This PR introduces the TableChanges struct which represents a Change
Data Feed scan. TableChanges is constructed from a Table, and performs 2
protocol and metadata scans.
1. The first is a P&M scan from start version, and ensures that CDF is
enabled at the beginning version.
2. The second P&M scan is for the end version. This one is used to
extract the schema at the end version and ensure that the final version
has CDF enabled.

I also add the logic for converting the end version's schema into the
cdf schema.

Note that the behaviour to fail early in table changes constructor
aligns with spark's behaviour. Only the CDF range is returned in spark's
error. No specific commit version that causes the failure is provided.

<!--
Uncomment this section if there are any changes affecting public APIs:
### This PR affects the following public APIs

If there are breaking changes, please ensure the `breaking-changes`
label gets added by CI, and describe why the changes are needed.

Note that _new_ public APIs are not considered breaking.
-->


## How was this change tested?
<!--
Please make sure to add test cases that check the changes thoroughly
including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please
clarify how you tested, ideally via a reproducible test documented in
the PR description.
-->
- Ensure that `TableChanges::try_new` checks the start and end version
- Ensure that the schema at the start and end versions are the same
- Ensure that the `table_changes.schema()` method returns the CDF
schema.
  • Loading branch information
OussamaSaoudi-db authored Nov 22, 2024
1 parent e450c05 commit d146b80
Show file tree
Hide file tree
Showing 12 changed files with 480 additions and 1 deletion.
2 changes: 2 additions & 0 deletions ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ pub enum KernelError {
FileAlreadyExists,
MissingCommitInfo,
UnsupportedError,
ChangeDataFeedUnsupported,
}

impl From<Error> for KernelError {
Expand Down Expand Up @@ -435,6 +436,7 @@ impl From<Error> for KernelError {
Error::FileAlreadyExists(_) => KernelError::FileAlreadyExists,
Error::MissingCommitInfo => KernelError::MissingCommitInfo,
Error::Unsupported(_) => KernelError::UnsupportedError,
Error::ChangeDataFeedUnsupported(_) => KernelError::ChangeDataFeedUnsupported,
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions kernel/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use crate::schema::DataType;
use crate::Version;

/// A [`std::result::Result`] that has the kernel [`Error`] as the error variant
pub type DeltaResult<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -179,6 +180,9 @@ pub enum Error {
/// Some functionality is currently unsupported
#[error("Unsupported: {0}")]
Unsupported(String),

#[error("Change data feed is unsupported for the table at version {0}")]
ChangeDataFeedUnsupported(Version),
}

// Convenience constructors for Error types that take a String argument
Expand Down Expand Up @@ -242,6 +246,9 @@ impl Error {
pub fn unsupported(msg: impl ToString) -> Self {
Self::Unsupported(msg.to_string())
}
pub fn change_data_feed_unsupported(version: impl Into<Version>) -> Self {
Self::ChangeDataFeedUnsupported(version.into())
}

// Capture a backtrace when the error is constructed.
#[must_use]
Expand Down
1 change: 1 addition & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub mod engine_data;
pub mod error;
pub mod expressions;
pub(crate) mod predicates;
pub mod table_changes;
pub mod table_features;

#[cfg(feature = "developer-visibility")]
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl Snapshot {
///
/// # Parameters
///
/// - `location`: url pointing at the table root (where `_delta_log` folder is located)
/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located)
/// - `engine`: Implementation of [`Engine`] apis.
/// - `version`: target version of the [`Snapshot`]
pub fn try_new(
Expand Down
18 changes: 18 additions & 0 deletions kernel/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::path::PathBuf;
use url::Url;

use crate::snapshot::Snapshot;
use crate::table_changes::TableChanges;
use crate::transaction::Transaction;
use crate::{DeltaResult, Engine, Error, Version};

Expand Down Expand Up @@ -80,6 +81,23 @@ impl Table {
Snapshot::try_new(self.location.clone(), engine, version)
}

/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`,
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the
/// `end_version`.
pub fn table_changes(
&self,
engine: &dyn Engine,
start_version: Version,
end_version: impl Into<Option<Version>>,
) -> DeltaResult<TableChanges> {
TableChanges::try_new(
self.location.clone(),
engine,
start_version,
end_version.into(),
)
}

/// Create a new write transaction for this table.
pub fn new_transaction(&self, engine: &dyn Engine) -> DeltaResult<Transaction> {
Transaction::try_new(self.snapshot(engine, None)?)
Expand Down
231 changes: 231 additions & 0 deletions kernel/src/table_changes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
//! Provides an API to read the table's change data feed between two versions.
use std::sync::LazyLock;

use url::Url;

use crate::log_segment::LogSegment;
use crate::path::AsUrl;
use crate::schema::{DataType, Schema, StructField, StructType};
use crate::snapshot::Snapshot;
use crate::table_features::ColumnMappingMode;
use crate::{DeltaResult, Engine, Error, Version};

static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| {
[
StructField::new("_change_type", DataType::STRING, false),
StructField::new("_commit_version", DataType::LONG, false),
StructField::new("_commit_timestamp", DataType::TIMESTAMP, false),
]
});

/// Represents a call to read the Change Data Feed (CDF) between two versions of a table. The schema of
/// `TableChanges` will be the schema of the table at the end verios with three additional columns:
/// - `_change_type`: String representing the type of change that for that commit. This may be one
/// of `delete`, `insert`, `update_preimage`, or `update_postimage`.
/// - `_commit_version`: Long representing the commit the change occurred in.
/// - `_commit_timestamp`: Time at which the commit occurred. If In-commit timestamps is enabled,
/// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the
/// commit file's modification timestamp.
///
/// Three properties must hold for the entire CDF range:
/// - Reading must be supported for every commit in the range. This is determined using [`ensure_read_supported`]
/// - Change Data Feed must be enabled for the entire range with the `delta.enableChangeDataFeed`
/// table property set to 'true'.
/// - The schema for each commit must be compatible with the end schema. This means that all the
/// same fields and their nullability are the same. Schema compatibility will be expanded in the
/// future to allow compatible schemas that are not the exact same.
/// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523)
///
/// [`CommitInfo`]: crate::actions::CommitInfo
/// [`ensure_read_supported`]: crate::actions::Protocol::ensure_read_supported
/// # Examples
/// Get `TableChanges` for versions 0 to 1 (inclusive)
/// ```rust
/// # use delta_kernel::engine::sync::SyncEngine;
/// # use delta_kernel::{Table, Error};
/// # let engine = Box::new(SyncEngine::new());
/// # let path = "./tests/data/table-with-cdf";
/// let table = Table::try_from_uri(path).unwrap();
/// let table_changes = table.table_changes(engine.as_ref(), 0, 1)?;
/// # Ok::<(), Error>(())
/// ````
/// For more details, see the following sections of the protocol:
/// - [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file)
/// - [Change Data Files](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-data-files).
#[derive(Debug)]
pub struct TableChanges {
pub(crate) log_segment: LogSegment,
table_root: Url,
end_snapshot: Snapshot,
start_version: Version,
schema: Schema,
}

impl TableChanges {
/// Creates a new [`TableChanges`] instance for the given version range. This function checks
/// these two properties:
/// - The change data feed table feature must be enabled in both the start or end versions.
/// - The schema at the start and end versions are the same.
///
/// Note that this does not check that change data feed is enabled for every commit in the
/// range. It also does not check that the schema remains the same for the entire range.
///
/// # Parameters
/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located)
/// - `engine`: Implementation of [`Engine`] apis.
/// - `start_version`: The start version of the change data feed
/// - `end_version`: The end version (inclusive) of the change data feed. If this is none, this
/// defaults to the newest table version.
pub fn try_new(
table_root: Url,
engine: &dyn Engine,
start_version: Version,
end_version: Option<Version>,
) -> DeltaResult<Self> {
// Both snapshots ensure that reading is supported at the start and end version using
// `ensure_read_supported`. Note that we must still verify that reading is
// supported for every protocol action in the CDF range.
let start_snapshot =
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?;
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?;

// Verify CDF is enabled at the beginning and end of the interval to fail early. We must
// still check that CDF is enabled for every metadata action in the CDF range.
let is_cdf_enabled = |snapshot: &Snapshot| {
static ENABLE_CDF_FLAG: &str = "delta.enableChangeDataFeed";
snapshot
.metadata()
.configuration
.get(ENABLE_CDF_FLAG)
.is_some_and(|val| val == "true")
};
if !is_cdf_enabled(&start_snapshot) {
return Err(Error::change_data_feed_unsupported(start_version));
} else if !is_cdf_enabled(&end_snapshot) {
return Err(Error::change_data_feed_unsupported(end_snapshot.version()));
}

// Verify that the start and end schemas are compatible. We must still check schema
// compatibility for each schema update in the CDF range.
// Note: Schema compatibility check will be changed in the future to be more flexible.
// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523)

if start_snapshot.schema() != end_snapshot.schema() {
return Err(Error::generic(format!(
"Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(),
)));
}

let log_root = table_root.join("_delta_log/")?;
let log_segment = LogSegment::for_table_changes(
engine.get_file_system_client().as_ref(),
log_root,
start_version,
end_version,
)?;

let schema = StructType::new(
end_snapshot
.schema()
.fields()
.cloned()
.chain(CDF_FIELDS.clone()),
);

Ok(TableChanges {
table_root,
end_snapshot,
log_segment,
start_version,
schema,
})
}

/// The start version of the `TableChanges`.
pub fn start_version(&self) -> Version {
self.start_version
}
/// The end version (inclusive) of the [`TableChanges`]. If no `end_version` was specified in
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`.
pub fn end_version(&self) -> Version {
self.log_segment.end_version
}
/// The logical schema of the change data feed. For details on the shape of the schema, see
/// [`TableChanges`].
pub fn schema(&self) -> &Schema {
&self.schema
}
/// Path to the root of the table that is being read.
pub fn table_root(&self) -> &Url {
&self.table_root
}
/// The partition columns that will be read.
#[allow(unused)]
pub(crate) fn partition_columns(&self) -> &Vec<String> {
&self.end_snapshot.metadata().partition_columns
}
/// The column mapping mode at the end schema.
#[allow(unused)]
pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode {
&self.end_snapshot.column_mapping_mode
}
}

#[cfg(test)]
mod tests {
use crate::engine::sync::SyncEngine;
use crate::schema::{DataType, StructField};
use crate::table_changes::CDF_FIELDS;
use crate::{Error, Table};
use itertools::assert_equal;

#[test]
fn table_changes_checks_enable_cdf_flag() {
// Table with CDF enabled, then disabled at version 2 and enabled at version 3
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();

let valid_ranges = [(0, 1), (0, 0), (1, 1)];
for (start_version, end_version) in valid_ranges {
let table_changes = table
.table_changes(engine.as_ref(), start_version, end_version)
.unwrap();
assert_eq!(table_changes.start_version, start_version);
assert_eq!(table_changes.end_version(), end_version);
}

let invalid_ranges = [(0, 2), (1, 2), (2, 2), (2, 3)];
for (start_version, end_version) in invalid_ranges {
let res = table.table_changes(engine.as_ref(), start_version, end_version);
assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_))))
}
}
#[test]
fn schema_evolution_fails() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
let expected_msg = "Failed to build TableChanges: Start and end version schemas are different. Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }";

// A field in the schema goes from being nullable to non-nullable
let table_changes_res = table.table_changes(engine.as_ref(), 3, 4);
assert!(matches!(table_changes_res, Err(Error::Generic(msg)) if msg == expected_msg));
}

#[test]
fn table_changes_has_cdf_schema() {
let path = "./tests/data/table-with-cdf";
let engine = Box::new(SyncEngine::new());
let table = Table::try_from_uri(path).unwrap();
let expected_schema = [
StructField::new("part", DataType::INTEGER, true),
StructField::new("id", DataType::INTEGER, true),
]
.into_iter()
.chain(CDF_FIELDS.clone());

let table_changes = table.table_changes(engine.as_ref(), 0, 0).unwrap();
assert_equal(expected_schema, table_changes.schema().fields().cloned());
}
}
Loading

0 comments on commit d146b80

Please sign in to comment.