-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Log Replay for Change Data Feed #540
Implement Log Replay for Change Data Feed #540
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #540 +/- ##
==========================================
+ Coverage 81.63% 82.33% +0.69%
==========================================
Files 68 71 +3
Lines 14874 15734 +860
Branches 14874 15734 +860
==========================================
+ Hits 12142 12954 +812
- Misses 2145 2162 +17
- Partials 587 618 +31 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
quick initial pass, will look more soon
dir, | ||
} | ||
} | ||
pub(crate) fn commit(&mut self, actions: &[Action]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice, this is cool. Can we maybe move it into the test_utils
crate? We have some similar but more limited functionality in there. If we put it in the same place we could probably replace a lot of what's there with some of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue is that I have Serialize
for actions as cfg_attr(test, ...)
. Should I try to make it dev-visible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tentatively moved to dev-visibility.
I'm also punting on replacing TestAction
since that changes a lot of stuff in read.rs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://users.rust-lang.org/t/problems-sharing-test-code-traits-via-a-testutils-crate/53305/2
Test-utils may not be able to support this fully 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good start -- I don't see anything worrisome
// we start our selection vector based on what was filtered. we will add to this vector | ||
// below if a file has been removed | ||
let selection_vector = match filter_vector { | ||
Some(ref filter_vector) => filter_vector.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to clone it? I don't see any later uses...
let selection_vector = self
.filter
.as_ref()
.map(|filter| filter.apply(actions.as_ref()))
.transpose()?
.unwrap_or_else(|| vec![true, actions.len()]);
(again below -- maybe we should find a way to consolidate that very similar code?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, consolidating makes a lot of sense. This feels like it should be part of the DataSkippingFilter, since it's similar code to scan. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting to add an associated method that encapsulates the above logic, like this?
impl DataSkippingFilter {
fn compute_selection_vector(filter: &Option<Self>, data: &dyn EngineData) -> DeltaResult<Vec<bool>> { .. }
If so, that seems like a good idea to me!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep! Shoulda made myself clearer lol. I can take that on in a followup PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: We only do data skipping once during CDF replay now... but this still seems useful since CDF and normal replay both would use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep both of them use it. It'd certainly be nice to have
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
follow-up issue?
2348f7e
to
c902995
Compare
3223a53
to
ba76bb1
Compare
&self, | ||
engine: &dyn Engine, | ||
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> { | ||
let commits = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wanted to make sure that copying ParsedLogPath
is okay. Passing a &[&ParsedLogPath]
makes it so the LogReplayScanner holds it, and the iterator now has a lifetime tied to table_changes.log_segment
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying seems fine? The main worry is if we ever might need to catch up on thousands/millions of versions because streaming fell behind, in which case we'd be materializing them twice instead of "only" once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fairly small, so doing it once per query is likely fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking pretty good. one structural comment and a few nits
&self, | ||
engine: &dyn Engine, | ||
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> { | ||
let commits = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fairly small, so doing it once per query is likely fine
kernel/src/error.rs
Outdated
expected: &StructType, | ||
actual: &StructType, | ||
) -> Self { | ||
Self::ChangeDataFeedIncompatibleSchema(format!("{:?}", expected), format!("{:?}", actual)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want those to be readable, we probably need to use alt format?
Self::ChangeDataFeedIncompatibleSchema(format!("{:?}", expected), format!("{:?}", actual)) | |
Self::ChangeDataFeedIncompatibleSchema(format!("{expected:#?}"), format!("{actual:#?}")) |
... and we should probably also filter out the compatible bits of the schema (and maybe even keep just one counter-example, so people don't drown in the error message). Nicer formatting can be a follow-up item tho.
&self, | ||
engine: &dyn Engine, | ||
) -> DeltaResult<impl Iterator<Item = DeltaResult<TableChangesScanData>>> { | ||
let commits = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copying seems fine? The main worry is if we ever might need to catch up on thousands/millions of versions because streaming fell behind, in which case we'd be materializing them twice instead of "only" once?
/// - Find the timestamp from a `CommitInfo` action if it exists. These are generated when | ||
/// In-commit timestamps is enabled. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually support ICT yet in kernel? It changes how time travel resolution works, for example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't seem like we have it enabled, I don't see it in TableProperties
.
It changes how time travel resolution works, for example?
Yeah anywhere there are timestamps, we need to use the ICT if it's enabled. This could be for Snapshot timetravel, or specifying a CDF with a timestamp. For CDF _commit_timestamp
field, the behaviour is to use ICT if it's there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That means CDF will anyway fail protocol checks if the table has ICT, no?
If we anyway don't support ICT yet, maybe we don't need to worry about timestamps here quite yet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now updated to ignore ICT
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we can't error out on ICT being present because it's not in TableProperties yet. @zachschuermann this will probably go into the TableProperties/TableConfiguration update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a PR up right now to add ICT as table property. Let's separate the two. I'd prefer to have a follow-up issue here that's specifically for 'enable ICT for CDF reads'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking great - generally advocating for keeping more things private PTAL at all those pub
spots carefully. test review TODO
if let Some(protocol) = visitor.protocol { | ||
protocol.ensure_read_supported()?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is saying that if a protocol action is present during iteration we check it's supported (I read the comment above - just trying to think through this)
do we need to check for table features at all? I guess CDF doesn't need to? (and i see the table properties check below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CDF is absolutely impacted by table features -- DV and ICT at a minimum (and soon: CC, Variant, Geo, etc). Even CDF itself is actually a table feature, and we could absolutely have new table features relating to it (DML team has been dreaming of a "zero-cost CDF" table feature based on row tracking, for example)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is saying that if a protocol action is present during iteration we check it's supported
Correct
do we need to check for table features at all?
No specific reader or writer features.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CDF is absolutely impacted by table features -- DV and ICT at a minimum
Is there any specific behaviour that I should be checking for this? I don't see anything explicit in the delta-spark code for this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already implemented... just without checking for the table feature:
- ICT - get timestamp from commit info
- DV - use deletion vectors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ICT table property checking is currently unsupported, and @zachschuermann is working on introducing it
We actually always have to handle deletion vectors, even when it's not specified in TableProperties. From the protocol:
Readers must read the table considering the existence of DVs, even when the delta.enableDeletionVectors table property is not set.
Edit:
wrong link. Here's the right one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's only true if the table feature is present in protocol. If the feature is not present in protocol then there should not be any DV and any that seem to be present are probably garbage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting close. Main outstanding issues IMO are:
- How do table features interact with CDF code?
- Don't we need to pay attention to dataChange flag on file actions?
if let Some(protocol) = visitor.protocol { | ||
protocol.ensure_read_supported()?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CDF is absolutely impacted by table features -- DV and ICT at a minimum (and soon: CC, Variant, Geo, etc). Even CDF itself is actually a table feature, and we could absolutely have new table features relating to it (DML team has been dreaming of a "zero-cost CDF" table feature based on row tracking, for example)
); | ||
let table_properties = TableProperties::from(configuration); | ||
require!( | ||
table_properties.enable_change_data_feed.unwrap_or(false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zachschuermann here's another case of cross-validation we need to add -- is it legal for the table property to exist if the table doesn't even include CDF in its protocol?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea ideally all of this will move to a new TableConfiguration
like we talked about - I'm happy to tackle that soon as i get some time in the next few days
/// - Otherwise, select `add` and `remove` actions. Note that only `remove` actions that do not | ||
/// share a path with an `add` action are selected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we also need to ignore non-dataChange actions? I don't see any code for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooohhh this is a good question! I had forgotten about this, thank you! also note we do have to be careful not to ignore all non-dataChange actions because I think cdc actions (which we need) always (usually?) have dataChange=false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right... I meant add and remove (forgot that cdc actions are also considered "file actions")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm correct me if I'm wrong, but this should be covered already in the FileActionSelectionVisitor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if getters[0].get_str(i, "add.path")?.is_some() {
self.selection_vector[i] = !self.has_cdc_action;
} else if let Some(path) = getters[1].get_str(i, "remove.path")? {
self.selection_vector[i] =
!self.has_cdc_action && !self.remove_dvs.contains_key(path)
} else {
self.selection_vector[i] = getters[2].get_str(i, "cdc.path")?.is_some()
};
the final case sets to true if it is a cdc action. Otherwise all other actions are set to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the spec for dataChange field:
When
false
the logical file must already be present in the table or the records in the added file must be contained in one or more remove actions in the same version
The missing bit would be something like this, I think?
if getters[0].get_str(i, "add.path")?.is_some() {
let isDataChange = getters[XXX].get("add.dataChange");
self.selection_vector[i] = isDataChange && !self.has_cdc_action;
(similar story for remove actions)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a test for this as well :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test got added I believe, right @OussamaSaoudi-db? (I had the same comment in test file I think)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good. just re-request me when the remaining comments are addressed
That's a good question. At the moment, I'm not sure how certain features should interact with CDF. I suppose a simple approach is to fail on any changes to table features.
Also a good point. Looking at delta-spark, they tfilter out actions that have
I can go ahead and add that filtering in the selection visitor. |
@scovich update regarding table features: I spoke to @zachschuermann And we're deciding to do some extra checks for protocol and table features. We'll start off being extra restrictive with the table features/properties we support. |
34782e1
to
c1d6103
Compare
What changes are proposed in this pull request?
This PR introduces the path for replaying the log for TableChanges and resolving cdc, add, and remove actions.
At the top level, we introduce
TableChangesScan::scan_data
, which getsTableChangesScanData
used to read CDF. The stream of scan data requires a log replay.To perform log replay, the
table_changes::LogReplayScanner
is introduced, which processes a single commit. It is responsible for two things:TableChangesScanData
, which is made up of transformedEngineData
, a selection vector, and a mapremove_dvs: HashMap<String, DvInfo>
.remove_dvs
maps from a remove action's path to its deletion vector.The
LogReplayScanner
performs two passes over the actions for each commit intry_new
andinto_scan_batches
respectively.To perform the operations above, two new visitors are added:
PreparePhaseVisitor
, andFileActionSelectionVisitor
.To test the changes, a new
LocalMockTable
struct is created for testing. This struct is used to write batches of actions into commits. This is used to verify that LogReplay produces correct output.The physical schema is added to
TableChangesScan
.How was this change tested?
The following cases are tested:
delta.enableChangeDataFeed
not being enabled.cdc
action is present and all other actions must be filtered.remove_dv
map. The add action must be selected.The following schema validation cases are tested: