Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing snapshot creation for earlier versions than the latest checkpoint #322

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 270 additions & 4 deletions kernel/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,32 @@ impl Snapshot {
(Some(cp), None) => {
list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)?
}
(Some(cp), Some(version)) if cp.version >= version => {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

(Some(cp), Some(version)) if cp.version <= version => {
list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)?
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

(Some(cp), Some(version)) if cp.version > version => {
hackintoshrao marked this conversation as resolved.
Show resolved Hide resolved
list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)?
}
// case where the last_checkpoint file is not found
_ => list_log_files(fs_client.as_ref(), &log_url)?,
};

debug!(
"Commit files: {:?}",
commit_files
.iter()
.map(|f| f.location.clone())
.collect::<Vec<_>>()
);
debug!(
"Checkpoint files: {:?}",
checkpoint_files
.iter()
.map(|f| f.location.clone())
.collect::<Vec<_>>()
);
// remove all files above requested version
if let Some(version) = version {
commit_files.retain(|meta| {
Expand Down Expand Up @@ -279,6 +299,8 @@ struct CheckpointMetadata {
/// The number of fragments if the last checkpoint was written in multiple parts.
pub(crate) parts: Option<i32>,
/// The number of bytes of the checkpoint.
/// TODO: Temporary fix, checkout this issue for full details: https://github.com/delta-incubator/delta-kernel-rs/issues/326
#[serde(alias = "size_in_bytes")]
pub(crate) size_in_bytes: Option<i64>,
/// The number of AddFile actions in the checkpoint.
pub(crate) num_of_add_files: Option<i64>,
Expand All @@ -301,13 +323,18 @@ fn read_last_checkpoint(
log_root: &Url,
) -> DeltaResult<Option<CheckpointMetadata>> {
let file_path = LogPath::new(log_root).child(LAST_CHECKPOINT_FILE_NAME)?;
debug!("Reading last checkpoint from: {}", file_path);
match fs_client
.read_files(vec![(file_path, None)])
.and_then(|mut data| data.next().expect("read_files should return one file"))
{
Ok(data) => Ok(serde_json::from_slice(&data)
.inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
.ok()),
Ok(data) => {
// print the data in bytes as a string
debug!("Data: {:?}", std::str::from_utf8(&data).unwrap());
Ok(serde_json::from_slice(&data)
.inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
.ok())
}
Err(Error::FileNotFound(_)) => Ok(None),
Err(err) => Err(err),
}
Expand Down Expand Up @@ -546,4 +573,243 @@ mod tests {
Some(3)
);
}

#[test]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aside: @nicklan -- This file is getting pretty large. I wonder what the rust best practice is for where unit tests should live?

My experience from non-rust projects is that bulky unit tests should generally live in separate (test-only) source files -- especially tests that only use public interfaces. In-file testing would make more sense for exercising private/internal code that isn't accessible outside the source file.

fn test_snapshot_version_0_with_checkpoint_at_version_1() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/app-txn-checkpoint/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();

let engine = SyncEngine::new();

// First, let's verify the content of the _last_checkpoint file
let fs_client = engine.get_file_system_client();
let log_url = LogPath::new(&url).child("_delta_log/").unwrap();
let last_checkpoint = read_last_checkpoint(fs_client.as_ref(), &log_url).unwrap();

assert!(
last_checkpoint.is_some(),
"_last_checkpoint file should exist"
);
let checkpoint_meta = last_checkpoint.unwrap();
debug!("Checkpoint metadata: {:#?}", checkpoint_meta);
assert_eq!(
checkpoint_meta.version, 1,
"Last checkpoint should be at version 1"
);
assert_eq!(checkpoint_meta.size, 8, "Checkpoint size should be 8");
assert_eq!(
checkpoint_meta.size_in_bytes,
Some(21857),
"Checkpoint size in bytes should be 21857"
);

// Now, request snapshot at version 0
let snapshot = Snapshot::try_new(url.clone(), &engine, Some(0));

match snapshot {
Ok(snap) => {
assert_eq!(snap.version(), 0, "Snapshot version should be 0");

// Verify that the snapshot contains the correct files
assert_eq!(
snap.log_segment.commit_files.len(),
1,
"There should be one commit file"
);
assert_eq!(
LogPath::new(&snap.log_segment.commit_files[0].location).version,
Some(0),
"The commit file should be version 0"
);

assert!(
snap.log_segment.checkpoint_files.is_empty(),
"Snapshot for version 0 should not contain checkpoint files"
);
}
Err(e) => {
panic!("Failed to create snapshot for version 0: {:?}", e);
}
}

// Verify the snapshot at version 1 (the checkpoint version)
let snapshot_1 = Snapshot::try_new(url, &engine, Some(1)).unwrap();
assert_eq!(snapshot_1.version(), 1, "Snapshot version should be 1");
assert_eq!(
snapshot_1.log_segment.checkpoint_files.len(),
1,
"There should be one checkpoint file for version 1"
);
assert_eq!(
LogPath::new(&snapshot_1.log_segment.checkpoint_files[0].location).version,
Some(1),
"The checkpoint file should be version 1"
);
}

#[test]
fn test_snapshot_with_version_less_than_latest_checkpoint() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();

let engine = SyncEngine::new();

// Attempt to create a snapshot at version 10
let result = Snapshot::try_new(url, &engine, Some(10));

// Check if the operation succeeded
assert!(
result.is_ok(),
"Expected snapshot creation to succeed for version 10"
);

let snapshot = result.unwrap();

// Verify the snapshot properties
assert_eq!(snapshot.version(), 10, "Snapshot version should be 10");
}

#[test]
fn test_snapshot_with_version_after_last_checkpoint() {
// Setup
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();

// Attempt to create a snapshot at version 26
let result = Snapshot::try_new(url.clone(), &engine, Some(26));

// Check if the operation succeeded
assert!(
result.is_ok(),
"Expected snapshot creation to succeed for version 26"
);

let snapshot = result.unwrap();

// Verify the snapshot properties
assert_eq!(snapshot.version(), 26, "Snapshot version should be 26");

// Verify that the commit files are correct
let commit_versions: Vec<_> = snapshot
.log_segment
.commit_files
.iter()
.filter_map(|f| LogPath::new(&f.location).version)
.collect();

assert!(
commit_versions.contains(&26),
"Snapshot should include commit file for version 26"
);
assert!(
commit_versions.contains(&25),
"Snapshot should include commit file for version 25"
);
assert!(
!commit_versions.contains(&27),
"Snapshot should not include commit file for version 27"
);
assert!(
!commit_versions.contains(&28),
"Snapshot should not include commit file for version 28"
);

// Verify that the checkpoint file is correct
assert_eq!(
snapshot.log_segment.checkpoint_files.len(),
1,
"Snapshot should include one checkpoint file"
);
assert_eq!(
LogPath::new(&snapshot.log_segment.checkpoint_files[0].location).version,
Some(24),
"Snapshot should use the checkpoint file for version 24"
);

// Verify that the log segment contains the correct range of files
let min_version = commit_versions.iter().min().unwrap();
let max_version = commit_versions.iter().max().unwrap();
assert!(
min_version >= &25,
"Minimum commit version should be at least 25"
);
assert_eq!(max_version, &26, "Maximum commit version should be 26");

// Verify that the effective version is correct
assert_eq!(snapshot.version(), 26, "Effective version should be 26");
}

#[test]
fn test_snapshot_at_latest_checkpoint_version() {
// Setup
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let engine = SyncEngine::new();

// Read the last checkpoint to get its version
let fs_client = engine.get_file_system_client();
let log_url = LogPath::new(&url).child("_delta_log/").unwrap();
let last_checkpoint = read_last_checkpoint(fs_client.as_ref(), &log_url)
.expect("Failed to read last checkpoint")
.expect("No checkpoint found");

let checkpoint_version = last_checkpoint.version;

// Attempt to create a snapshot at the checkpoint version
let result = Snapshot::try_new(url.clone(), &engine, Some(checkpoint_version));

// Check if the operation succeeded
assert!(
result.is_ok(),
"Expected snapshot creation to succeed for checkpoint version {}",
checkpoint_version
);

let snapshot = result.unwrap();

// Verify the snapshot properties
assert_eq!(
snapshot.version(),
checkpoint_version,
"Snapshot version should match checkpoint version"
);

// Verify that the checkpoint file is used
assert_eq!(
snapshot.log_segment.checkpoint_files.len(),
1,
"Snapshot should include one checkpoint file"
);
assert_eq!(
LogPath::new(&snapshot.log_segment.checkpoint_files[0].location).version,
Some(checkpoint_version),
"Snapshot should use the checkpoint file for version {}",
checkpoint_version
);

// Verify that no commit files after the checkpoint version are included
let commit_versions: Vec<_> = snapshot
.log_segment
.commit_files
.iter()
.filter_map(|f| LogPath::new(&f.location).version)
.collect();

assert!(
commit_versions.is_empty() || commit_versions.iter().all(|&v| v <= checkpoint_version),
"Snapshot should not include commit files after checkpoint version"
);

// Verify that the effective version is correct
assert_eq!(
snapshot.version(),
checkpoint_version,
"Effective version should match checkpoint version"
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"1ca33e8d-ad5c-4e43-bab4-0d58d74e049e","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1725607399077,"configuration":{}}}
{"add":{"path":"part-00001-1f750a9d-670b-4e10-b70e-f4b57f4d1dcc-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399089,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"7B0GBMq\",\"id\":14},\"maxValues\":{\"id\":67,\"value\":\"rMjmkJp\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399089,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-546f378e-c381-44fa-8546-0b5e0dec5d7a-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399102,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":15,\"value\":\"9ZGuiBM\"},\"maxValues\":{\"id\":80,\"value\":\"pi2j55u\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399102,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-6be399aa-c687-4f71-819e-efef39379674-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399110,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":0,\"value\":\"HzmPdRD\"},\"maxValues\":{\"id\":73,\"value\":\"st1DuKb\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399110,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-ef4482fe-3530-4db6-bf2f-2958f61e7fc8-c000.snappy.parquet","partitionValues":{},"size":865,"modificationTime":1725607399118,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":22,\"value\":\"dfU3zNV\"},\"maxValues\":{\"value\":\"vrbg0qC\",\"id\":96},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399118,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-2335a318-4612-4de7-b77e-b170ad690ed0-c000.snappy.parquet","partitionValues":{},"size":858,"modificationTime":1725607399125,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"2a0k0lm\",\"id\":19},\"maxValues\":{\"id\":47,\"value\":\"ow8MJEi\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399125,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-a3bdb273-8a93-45fc-84de-166ee32deb38-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399144,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"8MHGDDV\",\"id\":22},\"maxValues\":{\"id\":62,\"value\":\"x2gaSJY\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399144,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-504cf885-af24-4efd-9f32-5a1f4d2d5002-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399162,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"Dv5OQ0Z\",\"id\":32},\"maxValues\":{\"value\":\"n0fvhUM\",\"id\":80},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399162,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-615c5244-1b55-41f0-a26a-ef2477135e17-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399174,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":32,\"value\":\"JEBjgwD\"},\"maxValues\":{\"value\":\"p0yBYnX\",\"id\":80},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399174,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-4efd4d1f-f11d-4c16-95ce-b033f70d2313-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399187,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"7wkcLTu\",\"id\":48},\"maxValues\":{\"id\":77,\"value\":\"Mm3Gaui\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399187,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-f0b51ee5-50c0-482a-8487-ab2cf0801bd1-c000.snappy.parquet","partitionValues":{},"size":863,"modificationTime":1725607399205,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"PJAk83J\",\"id\":27},\"maxValues\":{\"id\":73,\"value\":\"dRnjkHw\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399205,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-13dad15a-6ba3-484e-afce-245da13e6e08-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399239,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"Go5YxhK\",\"id\":0},\"maxValues\":{\"value\":\"w5lZac0\",\"id\":92},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399239,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-2a1049b1-5a9b-4aa0-837c-745e039f2344-c000.snappy.parquet","partitionValues":{},"size":863,"modificationTime":1725607399252,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":45,\"value\":\"JXMAsip\"},\"maxValues\":{\"id\":95,\"value\":\"rqGpUxp\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399252,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"add":{"path":"part-00001-058ac7dd-4fac-4425-aafd-9640a3c3a0c1-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399265,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":49,\"value\":\"1M1iEwj\"},\"maxValues\":{\"id\":99,\"value\":\"s8Hyptg\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}}
{"commitInfo":{"timestamp":1725607399265,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}}
Loading
Loading