-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add table changes constructor #505
Changes from all commits
e993194
56ac31c
df40d8a
170f6f3
4ce9a9d
4524e75
23962b6
8d43bc5
073ae49
6d972a6
d3eb161
b1e1358
6739f09
d2e004b
af19183
e1347da
87d41c4
96a4b71
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ use std::path::PathBuf; | |
use url::Url; | ||
|
||
use crate::snapshot::Snapshot; | ||
use crate::table_changes::TableChanges; | ||
use crate::transaction::Transaction; | ||
use crate::{DeltaResult, Engine, Error, Version}; | ||
|
||
|
@@ -80,6 +81,23 @@ impl Table { | |
Snapshot::try_new(self.location.clone(), engine, version) | ||
} | ||
|
||
/// Create a [`TableChanges`] to get a change data feed for the table between `start_version`, | ||
/// and `end_version`. If no `end_version` is supplied, the latest version will be used as the | ||
/// `end_version`. | ||
pub fn table_changes( | ||
&self, | ||
engine: &dyn Engine, | ||
start_version: Version, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I hadn't considered that. Tho this change seems to cause issues:
Integers are treated as i32 by default, and i32 can't be converted into u64. By using just There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, what situation would produce an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, maybe my question should actually have been, "why not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ahh cool, that's nice. Sorry for the long aside :) |
||
end_version: impl Into<Option<Version>>, | ||
) -> DeltaResult<TableChanges> { | ||
TableChanges::try_new( | ||
self.location.clone(), | ||
engine, | ||
start_version, | ||
end_version.into(), | ||
) | ||
} | ||
|
||
/// Create a new write transaction for this table. | ||
pub fn new_transaction(&self, engine: &dyn Engine) -> DeltaResult<Transaction> { | ||
Transaction::try_new(self.snapshot(engine, None)?) | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,231 @@ | ||||||
//! Provides an API to read the table's change data feed between two versions. | ||||||
use std::sync::LazyLock; | ||||||
|
||||||
use url::Url; | ||||||
|
||||||
use crate::log_segment::LogSegment; | ||||||
use crate::path::AsUrl; | ||||||
use crate::schema::{DataType, Schema, StructField, StructType}; | ||||||
use crate::snapshot::Snapshot; | ||||||
use crate::table_features::ColumnMappingMode; | ||||||
use crate::{DeltaResult, Engine, Error, Version}; | ||||||
|
||||||
static CDF_FIELDS: LazyLock<[StructField; 3]> = LazyLock::new(|| { | ||||||
[ | ||||||
StructField::new("_change_type", DataType::STRING, false), | ||||||
StructField::new("_commit_version", DataType::LONG, false), | ||||||
StructField::new("_commit_timestamp", DataType::TIMESTAMP, false), | ||||||
Comment on lines
+15
to
+17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Side note: delta-spark does some hand-wringing about table schemas that already provide columns with these names. I think the solution was to block enabling CDF on such tables, and to block creating columns with those names on tables that are already CDF-enabled? (both are writer issues, not reader, but it's prob worth tracking) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tracked in #524. Thx! |
||||||
] | ||||||
}); | ||||||
|
||||||
/// Represents a call to read the Change Data Feed (CDF) between two versions of a table. The schema of | ||||||
/// `TableChanges` will be the schema of the table at the end verios with three additional columns: | ||||||
/// - `_change_type`: String representing the type of change that for that commit. This may be one | ||||||
/// of `delete`, `insert`, `update_preimage`, or `update_postimage`. | ||||||
/// - `_commit_version`: Long representing the commit the change occurred in. | ||||||
/// - `_commit_timestamp`: Time at which the commit occurred. If In-commit timestamps is enabled, | ||||||
/// this is retrieved from the [`CommitInfo`] action. Otherwise, the timestamp is the same as the | ||||||
/// commit file's modification timestamp. | ||||||
/// | ||||||
/// Three properties must hold for the entire CDF range: | ||||||
/// - Reading must be supported for every commit in the range. This is determined using [`ensure_read_supported`] | ||||||
/// - Change Data Feed must be enabled for the entire range with the `delta.enableChangeDataFeed` | ||||||
/// table property set to 'true'. | ||||||
/// - The schema for each commit must be compatible with the end schema. This means that all the | ||||||
/// same fields and their nullability are the same. Schema compatibility will be expanded in the | ||||||
/// future to allow compatible schemas that are not the exact same. | ||||||
/// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) | ||||||
/// | ||||||
/// [`CommitInfo`]: crate::actions::CommitInfo | ||||||
/// [`ensure_read_supported`]: crate::actions::Protocol::ensure_read_supported | ||||||
/// # Examples | ||||||
/// Get `TableChanges` for versions 0 to 1 (inclusive) | ||||||
/// ```rust | ||||||
/// # use delta_kernel::engine::sync::SyncEngine; | ||||||
/// # use delta_kernel::{Table, Error}; | ||||||
/// # let engine = Box::new(SyncEngine::new()); | ||||||
/// # let path = "./tests/data/table-with-cdf"; | ||||||
/// let table = Table::try_from_uri(path).unwrap(); | ||||||
/// let table_changes = table.table_changes(engine.as_ref(), 0, 1)?; | ||||||
/// # Ok::<(), Error>(()) | ||||||
/// ```` | ||||||
/// For more details, see the following sections of the protocol: | ||||||
/// - [Add CDC File](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-cdc-file) | ||||||
/// - [Change Data Files](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-data-files). | ||||||
#[derive(Debug)] | ||||||
pub struct TableChanges { | ||||||
pub(crate) log_segment: LogSegment, | ||||||
table_root: Url, | ||||||
end_snapshot: Snapshot, | ||||||
start_version: Version, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking, we won't have to re-find the start snapshot when we actually go to return results right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, we don't. Henceforth all we are interested in are the commits in the |
||||||
schema: Schema, | ||||||
} | ||||||
|
||||||
impl TableChanges { | ||||||
/// Creates a new [`TableChanges`] instance for the given version range. This function checks | ||||||
/// these two properties: | ||||||
/// - The change data feed table feature must be enabled in both the start or end versions. | ||||||
/// - The schema at the start and end versions are the same. | ||||||
/// | ||||||
/// Note that this does not check that change data feed is enabled for every commit in the | ||||||
/// range. It also does not check that the schema remains the same for the entire range. | ||||||
/// | ||||||
/// # Parameters | ||||||
/// - `table_root`: url pointing at the table root (where `_delta_log` folder is located) | ||||||
/// - `engine`: Implementation of [`Engine`] apis. | ||||||
/// - `start_version`: The start version of the change data feed | ||||||
/// - `end_version`: The end version (inclusive) of the change data feed. If this is none, this | ||||||
/// defaults to the newest table version. | ||||||
pub fn try_new( | ||||||
table_root: Url, | ||||||
engine: &dyn Engine, | ||||||
start_version: Version, | ||||||
end_version: Option<Version>, | ||||||
) -> DeltaResult<Self> { | ||||||
// Both snapshots ensure that reading is supported at the start and end version using | ||||||
// `ensure_read_supported`. Note that we must still verify that reading is | ||||||
// supported for every protocol action in the CDF range. | ||||||
let start_snapshot = | ||||||
Snapshot::try_new(table_root.as_url().clone(), engine, Some(start_version))?; | ||||||
let end_snapshot = Snapshot::try_new(table_root.as_url().clone(), engine, end_version)?; | ||||||
Comment on lines
+88
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. related #518
Comment on lines
+88
to
+90
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aside: Relating to the optimization we've discussed a few times -- the end snapshot should be able to use the first snapshot as a starting point for listing (and P&M), if the two versions aren't too far apart? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I would like that to be the case eventually. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a note to #489 |
||||||
|
||||||
// Verify CDF is enabled at the beginning and end of the interval to fail early. We must | ||||||
// still check that CDF is enabled for every metadata action in the CDF range. | ||||||
let is_cdf_enabled = |snapshot: &Snapshot| { | ||||||
static ENABLE_CDF_FLAG: &str = "delta.enableChangeDataFeed"; | ||||||
snapshot | ||||||
.metadata() | ||||||
.configuration | ||||||
.get(ENABLE_CDF_FLAG) | ||||||
.is_some_and(|val| val == "true") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess this will simplify once #453 merges? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep! |
||||||
}; | ||||||
if !is_cdf_enabled(&start_snapshot) { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to check at every point along the way, not just start and end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, we could leave that till later. I was hoping to do some checking at this stage since we can catch that error earlier. Should I just remove this check? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No I think this is okay, as long as we somehow check at each point. I think if it's cheap to error early we should There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No extra cost since we needed to get both snapshots anyway. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we add comments here for clarity? (e.g. "we check start/end to fail early if needed but at each point yielding CDF batches we do schema compat check" or something like that) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added comments addressing enable cdf flag, schema, and protocol. |
||||||
return Err(Error::change_data_feed_unsupported(start_version)); | ||||||
} else if !is_cdf_enabled(&end_snapshot) { | ||||||
return Err(Error::change_data_feed_unsupported(end_snapshot.version())); | ||||||
} | ||||||
|
||||||
// Verify that the start and end schemas are compatible. We must still check schema | ||||||
// compatibility for each schema update in the CDF range. | ||||||
// Note: Schema compatibility check will be changed in the future to be more flexible. | ||||||
// See issue [#523](https://github.com/delta-io/delta-kernel-rs/issues/523) | ||||||
|
||||||
if start_snapshot.schema() != end_snapshot.schema() { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aside: Technically users can stuff whatever metadata they want into the schema fields; should we track an item to ignore unknown metadata entries when comparing schemas? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah we could do that. I think schema checking needs to be changed a lot anyway to support at least adding columns and changing nullability, so there's more work to do in that department. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added link to issue to track schema compatibility checks. #523 |
||||||
return Err(Error::generic(format!( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could create a schema error or a CDF error (maybe take as a follow-up just wanted to call it out) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I'm leaving this to another PR because the error can't have |
||||||
"Failed to build TableChanges: Start and end version schemas are different. Found start version schema {:?} and end version schema {:?}", start_snapshot.schema(), end_snapshot.schema(), | ||||||
))); | ||||||
} | ||||||
|
||||||
let log_root = table_root.join("_delta_log/")?; | ||||||
let log_segment = LogSegment::for_table_changes( | ||||||
engine.get_file_system_client().as_ref(), | ||||||
log_root, | ||||||
start_version, | ||||||
end_version, | ||||||
)?; | ||||||
|
||||||
let schema = StructType::new( | ||||||
end_snapshot | ||||||
.schema() | ||||||
.fields() | ||||||
.cloned() | ||||||
.chain(CDF_FIELDS.clone()), | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The CDF fields are generated columns right? (not read directly from parquet)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, correct. I was planning on making them a special case only in CDF code. If you feel that we can legitimately treat these as generated column, we could add a new column type |
||||||
); | ||||||
|
||||||
Ok(TableChanges { | ||||||
table_root, | ||||||
end_snapshot, | ||||||
log_segment, | ||||||
start_version, | ||||||
schema, | ||||||
}) | ||||||
} | ||||||
|
||||||
/// The start version of the `TableChanges`. | ||||||
pub fn start_version(&self) -> Version { | ||||||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
self.start_version | ||||||
} | ||||||
/// The end version (inclusive) of the [`TableChanges`]. If no `end_version` was specified in | ||||||
/// [`TableChanges::try_new`], this returns the newest version as of the call to `try_new`. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This causes issues with cargo doc, and I think it's a lot of visual clutter to repeat the full [`TableChanges::try_new`] |
||||||
pub fn end_version(&self) -> Version { | ||||||
self.log_segment.end_version | ||||||
} | ||||||
/// The logical schema of the change data feed. For details on the shape of the schema, see | ||||||
/// [`TableChanges`]. | ||||||
pub fn schema(&self) -> &Schema { | ||||||
&self.schema | ||||||
} | ||||||
/// Path to the root of the table that is being read. | ||||||
pub fn table_root(&self) -> &Url { | ||||||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
&self.table_root | ||||||
} | ||||||
/// The partition columns that will be read. | ||||||
#[allow(unused)] | ||||||
pub(crate) fn partition_columns(&self) -> &Vec<String> { | ||||||
&self.end_snapshot.metadata().partition_columns | ||||||
} | ||||||
/// The column mapping mode at the end schema. | ||||||
#[allow(unused)] | ||||||
pub(crate) fn column_mapping_mode(&self) -> &ColumnMappingMode { | ||||||
&self.end_snapshot.column_mapping_mode | ||||||
} | ||||||
} | ||||||
|
||||||
#[cfg(test)] | ||||||
mod tests { | ||||||
use crate::engine::sync::SyncEngine; | ||||||
use crate::schema::{DataType, StructField}; | ||||||
use crate::table_changes::CDF_FIELDS; | ||||||
use crate::{Error, Table}; | ||||||
use itertools::assert_equal; | ||||||
|
||||||
#[test] | ||||||
fn table_changes_checks_enable_cdf_flag() { | ||||||
// Table with CDF enabled, then disabled at version 2 and enabled at version 3 | ||||||
let path = "./tests/data/table-with-cdf"; | ||||||
OussamaSaoudi-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
let engine = Box::new(SyncEngine::new()); | ||||||
let table = Table::try_from_uri(path).unwrap(); | ||||||
|
||||||
let valid_ranges = [(0, 1), (0, 0), (1, 1)]; | ||||||
for (start_version, end_version) in valid_ranges { | ||||||
let table_changes = table | ||||||
.table_changes(engine.as_ref(), start_version, end_version) | ||||||
.unwrap(); | ||||||
assert_eq!(table_changes.start_version, start_version); | ||||||
assert_eq!(table_changes.end_version(), end_version); | ||||||
} | ||||||
|
||||||
let invalid_ranges = [(0, 2), (1, 2), (2, 2), (2, 3)]; | ||||||
for (start_version, end_version) in invalid_ranges { | ||||||
let res = table.table_changes(engine.as_ref(), start_version, end_version); | ||||||
assert!(matches!(res, Err(Error::ChangeDataFeedUnsupported(_)))) | ||||||
} | ||||||
} | ||||||
#[test] | ||||||
fn schema_evolution_fails() { | ||||||
let path = "./tests/data/table-with-cdf"; | ||||||
let engine = Box::new(SyncEngine::new()); | ||||||
let table = Table::try_from_uri(path).unwrap(); | ||||||
let expected_msg = "Failed to build TableChanges: Start and end version schemas are different. Found start version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: true, metadata: {} }} } and end version schema StructType { type_name: \"struct\", fields: {\"part\": StructField { name: \"part\", data_type: Primitive(Integer), nullable: true, metadata: {} }, \"id\": StructField { name: \"id\", data_type: Primitive(Integer), nullable: false, metadata: {} }} }"; | ||||||
|
||||||
// A field in the schema goes from being nullable to non-nullable | ||||||
let table_changes_res = table.table_changes(engine.as_ref(), 3, 4); | ||||||
assert!(matches!(table_changes_res, Err(Error::Generic(msg)) if msg == expected_msg)); | ||||||
} | ||||||
|
||||||
#[test] | ||||||
fn table_changes_has_cdf_schema() { | ||||||
let path = "./tests/data/table-with-cdf"; | ||||||
let engine = Box::new(SyncEngine::new()); | ||||||
let table = Table::try_from_uri(path).unwrap(); | ||||||
let expected_schema = [ | ||||||
StructField::new("part", DataType::INTEGER, true), | ||||||
StructField::new("id", DataType::INTEGER, true), | ||||||
] | ||||||
.into_iter() | ||||||
.chain(CDF_FIELDS.clone()); | ||||||
|
||||||
let table_changes = table.table_changes(engine.as_ref(), 0, 0).unwrap(); | ||||||
assert_equal(expected_schema, table_changes.schema().fields().cloned()); | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to be able to specify:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is usually done in the Scan Builder. The plan is to specify the schema and predicate when building a
TableChangesScan
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and i suppose there are no optimizations to be done here with that information? If yes we likely would want to propagate that information here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since this is the main 'entrypoint' API for users to interact with reading CDF can we add some docs on semantics and ideally examples too? important things to call out: (1) require that CDF is enabled for the whole range, (2) require the same schema for the whole range (for now!), (3) how do i scan this thing?
(i'm probably forgetting other bits of semantics so don't take the above as exhaustive)