Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Log Replay for Change Data Feed #540

Merged
merged 60 commits into from
Dec 6, 2024

Conversation

OussamaSaoudi-db
Copy link
Collaborator

@OussamaSaoudi-db OussamaSaoudi-db commented Nov 25, 2024

What changes are proposed in this pull request?

This PR introduces the path for replaying the log for TableChanges and resolving cdc, add, and remove actions.

At the top level, we introduce TableChangesScan::scan_data, which gets TableChangesScanData used to read CDF. The stream of scan data requires a log replay.

To perform log replay, the table_changes::LogReplayScanner is introduced, which processes a single commit. It is responsible for two things:

  1. Producing TableChangesScanData, which is made up of transformed EngineData, a selection vector, and a map remove_dvs: HashMap<String, DvInfo>. remove_dvs maps from a remove action's path to its deletion vector.
  2. The replay scanner also performs schema, protocol, and table property validation to ensure that the Change Data Feed can be processed.

The LogReplayScanner performs two passes over the actions for each commit in try_new and into_scan_batches respectively.

To perform the operations above, two new visitors are added: PreparePhaseVisitor, and FileActionSelectionVisitor.

To test the changes, a new LocalMockTable struct is created for testing. This struct is used to write batches of actions into commits. This is used to verify that LogReplay produces correct output.

The physical schema is added to TableChangesScan.

How was this change tested?

The following cases are tested:

  • Valid metadata and protocol processing
  • Failure due to delta.enableChangeDataFeed not being enabled.
  • Failure due to incompatible schema
  • Simple add and remove case where there are no shared paths among the actions
  • A cdc action is present and all other actions must be filtered.
  • A remove and add action with the same path are resolved: The remove action is not selected, but it's registered in the remove_dv map. The add action must be selected.
  • Failure due to incompatible protocol update.
  • Correctly using default timestamp from the file modification time.
  • Data skipping works during log replay.
    The following schema validation cases are tested:
  • Adding non-nullable column
  • adding nullable column
  • commit has wider type than cdf schema
  • type widening (will eventually be supported)
  • cdf column is nullable while the commit schema is non-nullable (will eventually be supported)
  • cdf schema and commit schema have completely incompatible types
  • cdf schema has an extra nullable column.

@OussamaSaoudi-db OussamaSaoudi-db marked this pull request as draft November 25, 2024 22:44
@OussamaSaoudi-db OussamaSaoudi-db changed the title Implement Log Replay for Change Data Feedl [WIP] Implement Log Replay for Change Data Feedl Nov 25, 2024
@github-actions github-actions bot added the breaking-change Change that will require a version bump label Nov 25, 2024
Copy link

codecov bot commented Nov 25, 2024

Codecov Report

Attention: Patch coverage is 93.50797% with 57 lines in your changes missing coverage. Please review.

Project coverage is 82.33%. Comparing base (eb95c5b) to head (c1d6103).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/table_changes/log_replay.rs 86.41% 9 Missing and 24 partials ⚠️
kernel/src/table_changes/scan.rs 7.69% 12 Missing ⚠️
kernel/src/table_changes/log_replay/tests.rs 98.92% 0 Missing and 6 partials ⚠️
kernel/src/table_changes/mod.rs 82.14% 4 Missing and 1 partial ⚠️
ffi/src/error.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #540      +/-   ##
==========================================
+ Coverage   81.63%   82.33%   +0.69%     
==========================================
  Files          68       71       +3     
  Lines       14874    15734     +860     
  Branches    14874    15734     +860     
==========================================
+ Hits        12142    12954     +812     
- Misses       2145     2162      +17     
- Partials      587      618      +31     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quick initial pass, will look more soon

kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
dir,
}
}
pub(crate) fn commit(&mut self, actions: &[Action]) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, this is cool. Can we maybe move it into the test_utils crate? We have some similar but more limited functionality in there. If we put it in the same place we could probably replace a lot of what's there with some of this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue is that I have Serialize for actions as cfg_attr(test, ...). Should I try to make it dev-visible?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tentatively moved to dev-visibility.

I'm also punting on replacing TestAction since that changes a lot of stuff in read.rs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start -- I don't see anything worrisome

kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
// we start our selection vector based on what was filtered. we will add to this vector
// below if a file has been removed
let selection_vector = match filter_vector {
Some(ref filter_vector) => filter_vector.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to clone it? I don't see any later uses...

            let selection_vector = self
                .filter
                .as_ref()
                .map(|filter| filter.apply(actions.as_ref()))
                .transpose()?
                .unwrap_or_else(|| vec![true, actions.len()]);

(again below -- maybe we should find a way to consolidate that very similar code?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, consolidating makes a lot of sense. This feels like it should be part of the DataSkippingFilter, since it's similar code to scan. What do you think?

Copy link
Collaborator

@scovich scovich Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting to add an associated method that encapsulates the above logic, like this?

impl DataSkippingFilter {
    fn compute_selection_vector(filter: &Option<Self>, data: &dyn EngineData) -> DeltaResult<Vec<bool>> { .. }

If so, that seems like a good idea to me!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! Shoulda made myself clearer lol. I can take that on in a followup PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: We only do data skipping once during CDF replay now... but this still seems useful since CDF and normal replay both would use it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep both of them use it. It'd certainly be nice to have

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

follow-up issue?

@zachschuermann zachschuermann changed the title [WIP] Implement Log Replay for Change Data Feedl [WIP] Implement Log Replay for Change Data Feed Nov 26, 2024
@OussamaSaoudi-db OussamaSaoudi-db changed the title [WIP] Implement Log Replay for Change Data Feed Implement Log Replay for Change Data Feed Nov 26, 2024
@OussamaSaoudi-db OussamaSaoudi-db marked this pull request as ready for review November 26, 2024 19:08
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> {
let commits = self
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to make sure that copying ParsedLogPath is okay. Passing a &[&ParsedLogPath] makes it so the LogReplayScanner holds it, and the iterator now has a lifetime tied to table_changes.log_segment.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying seems fine? The main worry is if we ever might need to catch up on thousands/millions of versions because streaming fell behind, in which case we'd be materializing them twice instead of "only" once?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fairly small, so doing it once per query is likely fine

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking pretty good. one structural comment and a few nits

kernel/src/scan/data_skipping.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> {
let commits = self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fairly small, so doing it once per query is likely fine

kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
expected: &StructType,
actual: &StructType,
) -> Self {
Self::ChangeDataFeedIncompatibleSchema(format!("{:?}", expected), format!("{:?}", actual))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want those to be readable, we probably need to use alt format?

Suggested change
Self::ChangeDataFeedIncompatibleSchema(format!("{:?}", expected), format!("{:?}", actual))
Self::ChangeDataFeedIncompatibleSchema(format!("{expected:#?}"), format!("{actual:#?}"))

... and we should probably also filter out the compatible bits of the schema (and maybe even keep just one counter-example, so people don't drown in the error message). Nicer formatting can be a follow-up item tho.

&self,
engine: &dyn Engine,
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> {
let commits = self
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying seems fine? The main worry is if we ever might need to catch up on thousands/millions of versions because streaming fell behind, in which case we'd be materializing them twice instead of "only" once?

kernel/src/utils.rs Show resolved Hide resolved
kernel/src/utils.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
Comment on lines 84 to 85
/// - Find the timestamp from a `CommitInfo` action if it exists. These are generated when
/// In-commit timestamps is enabled.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually support ICT yet in kernel? It changes how time travel resolution works, for example?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like we have it enabled, I don't see it in TableProperties.

It changes how time travel resolution works, for example?

Yeah anywhere there are timestamps, we need to use the ICT if it's enabled. This could be for Snapshot timetravel, or specifying a CDF with a timestamp. For CDF _commit_timestamp field, the behaviour is to use ICT if it's there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That means CDF will anyway fail protocol checks if the table has ICT, no?
If we anyway don't support ICT yet, maybe we don't need to worry about timestamps here quite yet?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now updated to ignore ICT

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we can't error out on ICT being present because it's not in TableProperties yet. @zachschuermann this will probably go into the TableProperties/TableConfiguration update

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a PR up right now to add ICT as table property. Let's separate the two. I'd prefer to have a follow-up issue here that's specifically for 'enable ICT for CDF reads'

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking great - generally advocating for keeping more things private PTAL at all those pub spots carefully. test review TODO

kernel/src/error.rs Outdated Show resolved Hide resolved
kernel/src/scan/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/scan/mod.rs Outdated Show resolved Hide resolved
kernel/src/utils.rs Outdated Show resolved Hide resolved
kernel/src/utils.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/scan.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
Comment on lines 161 to 211
if let Some(protocol) = visitor.protocol {
protocol.ensure_read_supported()?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is saying that if a protocol action is present during iteration we check it's supported (I read the comment above - just trying to think through this)

do we need to check for table features at all? I guess CDF doesn't need to? (and i see the table properties check below)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CDF is absolutely impacted by table features -- DV and ICT at a minimum (and soon: CC, Variant, Geo, etc). Even CDF itself is actually a table feature, and we could absolutely have new table features relating to it (DML team has been dreaming of a "zero-cost CDF" table feature based on row tracking, for example)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is saying that if a protocol action is present during iteration we check it's supported

Correct

do we need to check for table features at all?

No specific reader or writer features.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CDF is absolutely impacted by table features -- DV and ICT at a minimum

Is there any specific behaviour that I should be checking for this? I don't see anything explicit in the delta-spark code for this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already implemented... just without checking for the table feature:

  • ICT - get timestamp from commit info
  • DV - use deletion vectors

Copy link
Collaborator Author

@OussamaSaoudi-db OussamaSaoudi-db Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ICT table property checking is currently unsupported, and @zachschuermann is working on introducing it

We actually always have to handle deletion vectors, even when it's not specified in TableProperties. From the protocol:

Readers must read the table considering the existence of DVs, even when the delta.enableDeletionVectors table property is not set.

Edit:
wrong link. Here's the right one

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's only true if the table feature is present in protocol. If the feature is not present in protocol then there should not be any DV and any that seem to be present are probably garbage.

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting close. Main outstanding issues IMO are:

  1. How do table features interact with CDF code?
  2. Don't we need to pay attention to dataChange flag on file actions?

kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
Comment on lines 161 to 211
if let Some(protocol) = visitor.protocol {
protocol.ensure_read_supported()?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CDF is absolutely impacted by table features -- DV and ICT at a minimum (and soon: CC, Variant, Geo, etc). Even CDF itself is actually a table feature, and we could absolutely have new table features relating to it (DML team has been dreaming of a "zero-cost CDF" table feature based on row tracking, for example)

kernel/src/table_changes/log_replay.rs Outdated Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Show resolved Hide resolved
kernel/src/table_changes/log_replay.rs Show resolved Hide resolved
kernel/src/table_changes/scan.rs Outdated Show resolved Hide resolved
);
let table_properties = TableProperties::from(configuration);
require!(
table_properties.enable_change_data_feed.unwrap_or(false),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zachschuermann here's another case of cross-validation we need to add -- is it legal for the table property to exist if the table doesn't even include CDF in its protocol?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea ideally all of this will move to a new TableConfiguration like we talked about - I'm happy to tackle that soon as i get some time in the next few days

Comment on lines +101 to +122
/// - Otherwise, select `add` and `remove` actions. Note that only `remove` actions that do not
/// share a path with an `add` action are selected.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we also need to ignore non-dataChange actions? I don't see any code for that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ooohhh this is a good question! I had forgotten about this, thank you! also note we do have to be careful not to ignore all non-dataChange actions because I think cdc actions (which we need) always (usually?) have dataChange=false.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right... I meant add and remove (forgot that cdc actions are also considered "file actions")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm correct me if I'm wrong, but this should be covered already in the FileActionSelectionVisitor

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

            if getters[0].get_str(i, "add.path")?.is_some() {
                self.selection_vector[i] = !self.has_cdc_action;
            } else if let Some(path) = getters[1].get_str(i, "remove.path")? {
                self.selection_vector[i] =
                    !self.has_cdc_action && !self.remove_dvs.contains_key(path)
            } else {
                self.selection_vector[i] = getters[2].get_str(i, "cdc.path")?.is_some()
            };

the final case sets to true if it is a cdc action. Otherwise all other actions are set to false.

Copy link
Collaborator

@scovich scovich Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the spec for dataChange field:

When false the logical file must already be present in the table or the records in the added file must be contained in one or more remove actions in the same version

The missing bit would be something like this, I think?

            if getters[0].get_str(i, "add.path")?.is_some() {
                let isDataChange = getters[XXX].get("add.dataChange");
                self.selection_vector[i] = isDataChange && !self.has_cdc_action;

(similar story for remove actions)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test for this as well :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test got added I believe, right @OussamaSaoudi-db? (I had the same comment in test file I think)

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looking good. just re-request me when the remaining comments are addressed

kernel/src/table_changes/log_replay.rs Show resolved Hide resolved
@OussamaSaoudi-db
Copy link
Collaborator Author

@scovich

How do table features interact with CDF code?

That's a good question. At the moment, I'm not sure how certain features should interact with CDF. I suppose a simple approach is to fail on any changes to table features.

Don't we need to pay attention to dataChange flag on file actions?

Also a good point. Looking at delta-spark, they tfilter out actions that have dataChange = false.

            val addActions = actions.collect { case a: AddFile if a.dataChange => a }
            val removeActions = actions.collect { case r: RemoveFile if r.dataChange => r }

I can go ahead and add that filtering in the selection visitor.

@OussamaSaoudi-db
Copy link
Collaborator Author

@scovich update regarding table features: I spoke to @zachschuermann And we're deciding to do some extra checks for protocol and table features. We'll start off being extra restrictive with the table features/properties we support.

@OussamaSaoudi-db OussamaSaoudi-db merged commit 3b456e4 into delta-io:main Dec 6, 2024
20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking-change Change that will require a version bump
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants