Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing snapshot creation for earlier versions than the latest checkpoint #322

Closed
wants to merge 16 commits into from

Conversation

hackintoshrao
Copy link
Contributor

@hackintoshrao hackintoshrao commented Sep 5, 2024

Implement DeltaLogGroupingIterator for Efficient Snapshot Creation

This PR implements the DeltaLogGroupingIterator, significantly improving the snapshot creation process. This new approach offers a more efficient and robust method for handling Delta log files, particularly tables with multiple checkpoints and versions.

Key Improvements:

  1. DeltaLogGroupingIterator Implementation:

    • Introduces a linked list structure for Delta log representation
    • Creates nodes only for checkpoints, reducing memory usage
    • Efficiently handles multi-part checkpoints and version requests
  2. Linked List Structure:
    The Delta log is now represented as a linked list of checkpoint nodes:

    Node 1 (No Checkpoint) -> Node 2 (Checkpoint A) -> Node 3 (Checkpoint B) -> ...

    Each node contains:

    • Checkpoint version and files (if applicable)
    • Commit files up to the next checkpoint
    • Link to the next checkpoint node
  3. Memory Efficiency:

    • Creates nodes only for checkpoints, not for every log entry
    • Significantly reduces memory usage for large Delta tables
    • Allows efficient traversal and filtering based on requested versions
  4. Improved Version Handling:

    • Streamlines handling of various scenarios:
      a) Requested version after the last checkpoint
      b) Requested version between checkpoints
      c) Requested version before any checkpoint
    • Optimizes file filtering and sorting for each scenario

This commit introduces a new unit test to verify that the Delta table
implementation can correctly build a snapshot at a version that is
earlier than the latest checkpoint. Specifically, it:

- Tests snapshot creation at version 10 when later checkpoints exist
- Adds delta dataset with multiple checkpoints as test data.
Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a great test, thanks!

Do I understand correctly, based on the test output, that snapshot creation just plain fails today, because the listing doesn't even find the older files the snapshot needs?

(we need to fix the formatting issues before merging)

kernel/src/snapshot.rs Outdated Show resolved Hide resolved
Comment on lines 582 to 586
// assert_eq!(
// checkpoint_meta.size_in_bytes,
// Some(21857),
// "Checkpoint size in bytes should be 21857"
// );
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why commented out, out of curiosity? Is the byte count wrong and needs to be updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm getting a None result for checkpoint_meta.size_in_bytes; hence, the test is failing. I commented the code so that I first focus on the snapshot versioning bug! I'm looking into this one, too, now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The root cause of the issue is a mismatch between the casing conventions used in the JSON data and the struct definition. The struct is expecting camelCase (e.g., sizeInBytes) due to the #[serde(rename_all = "camelCase")] attribute, but the actual JSON data is using snake_case (size_in_bytes).

Your short-term fix of adding an alias is a good temporary solution:

#[serde(alias = "size_in_bytes")]
pub(crate) size_in_bytes: Option<i64>,

However, for a long-term solution, we need to address the inconsistency between the Delta protocol specification and the implementation in delta-rs (I built my test data from delta-rs, assuming that the other test data was also created using the same, including the last_checkpoint file).

According to the Delta protocol documentation, the last checkpoint file schema should indeed use camelCase for field names. The fact that delta-rs is writing the metadata in snake_case suggests a deviation from the protocol specification.

The long-term solution should involve:

  1. Align the delta-rs implementation with the Delta protocol specification by ensuring the fields in the last checkpoint file are written using camelCase field names.
  2. Updating the CheckpointMetadata struct to expect camelCase without needing aliases.
  3. If any backward compatibility is required, consider implementing a more robust deserialization to handle camelCase and snake_case variants.

It would be worth investigating why delta-rs is writing the metadata in snake_case contrary to the protocol specification.

What are your thoughts @scovich ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a new issue to move this conversation #326 . Would love to know your thoughts on the issue link.

@hackintoshrao
Copy link
Contributor Author

hackintoshrao commented Sep 6, 2024

Do I understand correctly, based on the test output, that snapshot creation just plain fails today, because the listing doesn't even find the older files the snapshot needs?

Yes, you're right! Currently, the code only considers the latest checkpoint and doesn't look into the older checkpoints, which leads to this issue.

Also, I would love to know your thoughts on this issue too:
#326

Enroute sending the patch for the Snapshot issue along with the suggested formatting fixes.

- Added serde alias for 'size_in_bytes' field in CheckpointMetadata struct
- This allows deserialization of both camelCase and snake_case variants
- Addresses issue with inconsistent field naming in _last_checkpoint file

This is a temporary workaround for the issue described in delta-io#326. The long-term
solution will involve aligning the checkpoint writing logic with the Delta
protocol specification to use camelCase field names consistently.

See delta-io#326 for full details.
- Added commit files for versions 25, 26, 27, and 28 to the multiple-checkpoint test dataset
- Last checkpoint remains at version 24
- Purpose: Enable testing of snapshot creation for versions between the last checkpoint and the latest commit

This change allows us to test scenarios where:
1. A snapshot is requested for a version after the last checkpoint
2. The behavior of version selection when commits exist beyond the last checkpoint
3. The correct handling of file listing and filtering for versions between checkpoints and the latest commit

These additions will help ensure the snapshot creation logic correctly handles
various version scenarios, particularly focusing on the interaction between
checkpoints and subsequent commits.
This commit introduces a new unit test 'test_snapshot_with_version_after_last_checkpoint'
to verify correct snapshot behavior when requesting a version that is after the last
checkpoint but not the latest commit.

Test data state:
- Located in ./tests/data/multiple-checkpoint/
- Contains commits up to version 28
- Last checkpoint is at version 24
- Requested snapshot version is 26

The test ensures:
1. Snapshot creation succeeds for version 26
2. Correct commit files are included (versions 25 and 26)
3. Older commits are excluded (version 24 and earlier)
4. Newer commits are excluded (versions 27 and 28)
5. The correct checkpoint file (version 24) is used
6. The effective version of the snapshot is set correctly

This test improves coverage of the snapshot creation logic, particularly for cases
where the requested version falls between the last checkpoint and the latest commit.
@hackintoshrao hackintoshrao changed the title Add test to highlight issue with snapshot creation for earlier versions than the latest checkpoint Fixing snapshot creation for earlier versions than the latest checkpoint Sep 6, 2024
This commit updates the snapshot creation process to more efficiently
utilize the last checkpoint information. Key changes include:

1. Streamlined logic for determining which log files to list based on
   the presence of a checkpoint and the requested version.

2. Use checkpoint data to list files when available, regardless of
   the requested version, allowing for more efficient file retrieval.

3. Fall back to listing all log files when no checkpoint is found.

This approach optimizes file reading operations, particularly for
tables with long histories, while maintaining correct behavior for
all version request scenarios. The subsequent filtering of commits
based on the requested version remains unchanged, ensuring accurate
snapshot creation.
This commit introduces a new unit test:
'test_snapshot_at_latest_checkpoint_version'. The test verifies that:

1. Snapshot creation succeeds when requesting the exact version of the
   latest checkpoint.
2. The created snapshot has the correct version.
3. The appropriate checkpoint file is used.
4. No commit files after the checkpoint version are included.
5. The effective version matches the checkpoint version.

This test covers an important edge case in snapshot creation, ensuring
correct behavior when the requested version aligns exactly with the
latest checkpoint. It complements existing tests and improves coverage
of the snapshot creation logic.
This commit updates the `list_log_files_with_checkpoint` function to
incorporate version filtering, previously handled in `try_new`. Changes include:

1. Add `requested_version: Option<Version>` parameter to
   `list_log_files_with_checkpoint`.
2. Implement version filtering logic within the commit file selection process.
3. Remove redundant version filtering from `try_new`.
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for this. Had one basic question.

@@ -152,12 +152,32 @@ impl Snapshot {
(Some(cp), None) => {
list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)?
}
(Some(cp), Some(version)) if cp.version >= version => {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

(Some(cp), Some(version)) if cp.version <= version => {
list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)?
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

kernel/src/snapshot.rs Outdated Show resolved Hide resolved
@nicklan
Copy link
Collaborator

nicklan commented Sep 6, 2024

Note also that the tests fail on mac.

- Merged list_log_files and list_log_files_with_checkpoint into a single function
- Enhanced file filtering to correctly handle checkpoint boundaries
- Updated test cases to cover all scenarios, including:
  * Initial commits without checkpoints
  * Checkpoint versions
  * Versions between checkpoints
  * Accumulating commits after checkpoints
- Added detailed comments explaining each test case
- Improved handling of requested versions at or near checkpoint versions
- Optimized file sorting and filtering for better performance

This refactor simplifies the codebase, improves test coverage, and ensures
correct behavior for all Delta log file listing scenarios, particularly
around checkpoint boundaries.
@hackintoshrao
Copy link
Contributor Author

Note also that the tests fail on mac.

Fixed.

- Optimize file selection based on checkpoints and requested versions
- Ensure correct handling of commit files and checkpoints
- Improve efficiency by leveraging most recent checkpoints
- Add logic to handle cases before and after checkpoints
@hackintoshrao
Copy link
Contributor Author

Created a unified list_log_files to address the issue temporarily

It would be super helpful if you could review my assumptions about the correct behavior by examining the test cases.
It's breaking a few tests elsewhere in the codebase; I'll look into it once these changes are approved.

@scovich @nicklan

Copy link
Collaborator

@scovich scovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a stab at this! I had some ideas on how we might make the code simpler, PTAL.

kernel/src/error.rs Outdated Show resolved Hide resolved
kernel/src/snapshot.rs Outdated Show resolved Hide resolved
The logic ensures that:
a) We never include commit files beyond the requested version.
b) We use the most recent checkpoint as a starting point for efficiency.
c) We include the checkpoint version file only if it's explicitly requested.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate what this means? Is it related to 1/ and 3/ examples above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this explanation is a overkill, removed it.

) -> DeltaResult<(Vec<FileMeta>, Vec<FileMeta>)> {
let version_prefix = format!("{:020}", 0);
let start_from = log_root.join(&version_prefix)?;
let start_from = log_root.join(&format!("{:020}", 0))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to always start listing from 0? Shouldn't we prefer to start from last checkpoint version when possible?

Comment on lines 368 to 373
// If no requested version, try to get the last checkpoint
let last_checkpoint = if requested_version.is_none() {
read_last_checkpoint(fs_client, log_root)?
} else {
None
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a last checkpoint hint still useful as long as it's not after the requested version?

Suggested change
// If no requested version, try to get the last checkpoint
let last_checkpoint = if requested_version.is_none() {
read_last_checkpoint(fs_client, log_root)?
} else {
None
};
// Try to get the last checkpoint, and use it if not beyond the requested version.
let last_checkpoint = read_last_checkpoint(fs_client, log_root)
.filter(|hint| !requested_version.is_some_and(|version| version < hint.version));
let start_from = last_checkpoint.unwrap_or(0u64);

(note: Boolean negation is hard to read. Option.is_some_or would be better here, but it's not stable rust yet)

.filter(|hint| requested_version.is_none_or(|requested| hint.version <= requested));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, made the change to use the hint the requested version is more than the checkpoint version.

Comment on lines 396 to 402
// If we're using last_checkpoint and we've found the next checkpoint, stop
if requested_version.is_none()
&& last_checkpoint.is_some()
&& file_version > last_checkpoint.as_ref().unwrap().version as i64
{
next_checkpoint_version = Some(file_version);
break;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand this part of the code. Why would the presence of a last checkpoint hint decide whether we keep iterating or not?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the original code, I think it was missing adequate version checks, and also didn't truncate the existing commit file list after finding a new checkpoint? Something like this, perhaps?

for maybe_meta in fs_client.list_from(&start_from)? {
      ...
    // NOTE: Because our listing hint was for a version file, the listing returns all version
    // files before it returns any non-version file. So we should only continue if the current
    // file has a version, and that version is not beyond the requested version (if any).
    let file_version = match log_path.version {
        Some(v) if !requested_version.is_some_and(|requested| requested < v) => v,
        _ => break,
    };

    // A file version above the current checkpoint version needs special handling (see below)
    let beyond_current_checkpoint = !last_checkpoint_version.is_some_and(|cv| cv == file_version);
    
    // NOTE: v.checkpoint.parquet conveniently appears before v.json in the listing
    if log_path.is_checkpoint {
        if beyond_current_checkpoint {
            // First file of a new checkpoint - start over
            //
            // TODO: We can't trust a new multi-part checkpoint until we prove it's complete. We
            // must keep the previous checkpoint until we've seen all files for the new checkpoint.
            last_checkpoint_version = Some(file_version);
            current_version = Some(file_version);
            checkpoint_files.clear();
            commit_files.clear();
        }
        checkpoint_files.push(meta.clone());
    } else if log_path.is_commit {
        // Only include commits _after_ the current checkpoint. This may produce an
        // empty commit file list, if the snapshot version we chose has a checkpoint.
        if beyond_current_checkpoint {
            // Blow up if this file presents a version gap:
            // OK - [0.json] (first commit)
            // BAD - [4.json] (missing checkpoint)
            // OK - [4.checkpoint.parquet, 5.json] (first commit after checkpoint)
            // BAD - [4.checkpoint.parquet, 6.json] (missing commit)
            // OK - [..., 4.json, 5.json] (successive commits)
            // BAD - [..., 4.json, 6.json] (missing commit)
            match current_version {
                Some(v) if v + 1 < file_version => /* missing commit error! */,
                None if 0 < file_version => /* missing checkpoint error! */,
                _ => (), 
            }
            current_version = Some(file_version);
            commit_files.push(meta.clone())
        }
    }
}

// Commit files are already filtered, but need to be sorted newest-first
commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location));

// TODO: Filter out all but one complete checkpoint, in case e.g. client A wrote
// a single-part checkpoint and client B also wrote a multi-part checkpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just getting way too messy to handle all the scenarios, especially with multiple-multipart-multi-writer checkpointing!

I guess tonight I'll go for a quick implementation of group iterator from https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaLogGroupingIterator.scala ! I'll turn around in a few hours.

Copy link
Collaborator

@scovich scovich Sep 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought about the grouping iterator: The scala code returns a (Version, Seq[FileStatus]) tuple. But (a) that's not super readable and (b) most versions won't have checkpoints. It might be worth creating an actual struct instead, so we can name the fields and avoid instantiating a bunch of single-element vectors?

struct VersionFiles {
    version: Version,
    commitFile: Option<LogPath>,
    checkpointFiles: Vec<LogPath>,
}


// Stop if we've passed the requested version
if let Some(req_version) = requested_version {
if file_version > req_version as i64 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any idea why the existing code was casting from u64 to i64 (which this new code seems to follow)?

I suspect it's because 0 is signed by default, and that passing 0u64 would eliminate type mismatch issues?

@@ -546,4 +659,646 @@ mod tests {
Some(3)
);
}

#[test]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: @nicklan -- This file is getting pretty large. I wonder what the rust best practice is for where unit tests should live?

My experience from non-rust projects is that bulky unit tests should generally live in separate (test-only) source files -- especially tests that only use public interfaces. In-file testing would make more sense for exercising private/internal code that isn't accessible outside the source file.

- Correct explanation for requested version matching checkpoint
- Clarify that both commit and checkpoint files are included
- Align comment with existing test cases and implementation
Copy link
Contributor Author

@hackintoshrao hackintoshrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going for a quick implementation of a group iterator: https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaLogGroupingIterator.scala. It makes it much easier to handle the multipart concurrent write scenarios; handling all the corners is getting too messy! @scovich. I will turn around soon!

kernel/src/snapshot.rs Outdated Show resolved Hide resolved
kernel/src/snapshot.rs Outdated Show resolved Hide resolved
The logic ensures that:
a) We never include commit files beyond the requested version.
b) We use the most recent checkpoint as a starting point for efficiency.
c) We include the checkpoint version file only if it's explicitly requested.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this explanation is a overkill, removed it.

Comment on lines 368 to 373
// If no requested version, try to get the last checkpoint
let last_checkpoint = if requested_version.is_none() {
read_last_checkpoint(fs_client, log_root)?
} else {
None
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, made the change to use the hint the requested version is more than the checkpoint version.

Comment on lines 396 to 402
// If we're using last_checkpoint and we've found the next checkpoint, stop
if requested_version.is_none()
&& last_checkpoint.is_some()
&& file_version > last_checkpoint.as_ref().unwrap().version as i64
{
next_checkpoint_version = Some(file_version);
break;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just getting way too messy to handle all the scenarios, especially with multiple-multipart-multi-writer checkpointing!

I guess tonight I'll go for a quick implementation of group iterator from https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaLogGroupingIterator.scala ! I'll turn around in a few hours.

@scovich
Copy link
Collaborator

scovich commented Sep 10, 2024

I'm going for a quick implementation of a group iterator: https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaLogGroupingIterator.scala. It makes it much easier to handle the multipart concurrent write scenarios; handling all the corners is getting too messy! @scovich. I will turn around soon!

Fair enough. I was hoping we could just work in a simple fix for the immediate problem before tackling the full solution, but at some point it's no longer "simple." Hopefully the grouping iterator makes life easier all around.

This commit introduces the DeltaLogGroupingIterator, a crucial component for
processing Delta Lake log files. The iterator groups log files into checkpoint
nodes, handling various scenarios including single-part checkpoints, multi-part
checkpoints, and commits without checkpoints.

Key features and improvements:

1. Efficient sorting and processing of log files:
   - Files are sorted by version and type (checkpoints before commits)
   - Handles version gaps and ensures proper sequencing of files

2. Flexible checkpoint handling:
   - Supports both single-part and multi-part checkpoints
   - Correctly groups multi-part checkpoint files
   - Detects and reports incomplete multi-part checkpoints

3. Robust error handling:
   - Detects and reports version gaps in the log
   - Ensures the log starts from version 0 when required
   - Reports incomplete multi-part checkpoints

4. Memory-efficient linked list structure:
   - Uses Rc<RefCell<>> for shared ownership and interior mutability
   - Allows for easy traversal of the log structure

5. Iterator implementation:
   - Provides a standard Rust iterator interface for easy consumption of log data
This commit enhances the LogPath struct with new functionality to handle
multi-part checkpoint files in Delta Lake log processing. Two new methods
have been added to improve the identification and parsing of multi-part
checkpoint files:

1. is_multi_part_checkpoint():
   - Determines if a file is a multi-part checkpoint
   - Handles both single-part and multi-part checkpoint file formats
   - Returns a boolean indicating if the file is a multi-part checkpoint

2. get_checkpoint_part_numbers():
   - Extracts part number and total parts for multi-part checkpoints
   - Returns Option<(u64, u64)> representing (part_number, total_parts)
   - Returns None for single-part checkpoints or non-checkpoint files

Key improvements:
- Robust parsing of checkpoint filenames
- Clear distinction between single-part and multi-part checkpoints
- Efficient extraction of part information from multi-part checkpoints
- Introduce new Error variant for invalid Delta Log structures
- Improve error reporting for log processing issues
- Supports recent changes in DeltaLogGroupingIterator and LogPath
- Replace manual file processing with DeltaLogGroupingIterator
- Improve handling of multi-part checkpoints and version requests
- Enhance error handling for invalid Delta log structures
- Optimize file filtering and sorting for different scenarios
- Update comments to explain complex logic and edge cases
- Maintain backwards compatibility with existing test cases
@zachschuermann
Copy link
Collaborator

hey @hackintoshrao we are prioritizing #344 which should unblock this work, sorry for the delay!

@nicklan
Copy link
Collaborator

nicklan commented Oct 18, 2024

@hackintoshrao #344 finally merged which hopefully unblocks you here

@hackintoshrao
Copy link
Contributor Author

Thanks @nicklan . @scovich Im back on the PR, couldn't get to it this week.

@zachschuermann
Copy link
Collaborator

resolved #323 ?

@hackintoshrao
Copy link
Contributor Author

Need another week mostly, on it actively.

@hackintoshrao
Copy link
Contributor Author

Closing in favour of #515 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants