diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 70c0697aa..b8f82b58c 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -155,6 +155,10 @@ pub enum Error { /// Expressions did not parse or evaluate correctly #[error("Invalid expression evaluation: {0}")] InvalidExpressionEvaluation(String), + + /// An error indicating an invalid Delta Log structure + #[error("Invalid Delta Log: {0}")] + InvalidDeltaLog(String), } // Convenience constructors for Error types that take a String argument diff --git a/kernel/src/group_iterator.rs b/kernel/src/group_iterator.rs new file mode 100644 index 000000000..24964ffe3 --- /dev/null +++ b/kernel/src/group_iterator.rs @@ -0,0 +1,260 @@ +use std::rc::Rc; +use std::cell::RefCell; +use crate::FileMeta; +use crate::path::LogPath; +use crate::{DeltaResult, Error}; + +/// This iterator groups Delta log files into checkpoint nodes. It handles various scenarios including: +/// - Single-part checkpoints +/// - Multi-part checkpoints +/// - Multiple checkpoints for the same version +/// - Commits without checkpoints +/// +/// The iterator creates a linked list of CheckpointNodes, where each node represents a checkpoint +/// (if present) and all commits up to the next checkpoint. +/// +/// Sample Delta log structures and resulting node structures: +/// +/// 1. Simple scenario with single-part checkpoints: +/// Files: +/// 00000000000000000000.json +/// 00000000000000000001.json +/// 00000000000000000002.checkpoint.parquet +/// 00000000000000000003.json +/// 00000000000000000004.checkpoint.parquet +/// 00000000000000000005.json +/// +/// Resulting nodes: +/// Node 1: {checkpoint_version: None, checkpoint_files: None, commits: [0.json, 1.json]} +/// Node 2: {checkpoint_version: 2, checkpoint_files: [2.checkpoint.parquet], commits: [3.json]} +/// Node 3: {checkpoint_version: 4, checkpoint_files: [4.checkpoint.parquet], commits: [5.json]} +/// +/// 2. Scenario with multi-part checkpoints: +/// Files: +/// 00000000000000000000.json +/// 00000000000000000001.json +/// 00000000000000000002.checkpoint.00000000001.00000000002.parquet +/// 00000000000000000002.checkpoint.00000000002.00000000002.parquet +/// 00000000000000000003.json +/// 00000000000000000004.checkpoint.parquet +/// 00000000000000000005.json +/// +/// Resulting nodes: +/// Node 1: {checkpoint_version: None, checkpoint_files: None, commits: [0.json, 1.json]} +/// Node 2: {checkpoint_version: 2, checkpoint_files: [2.checkpoint.*.parquet], multi_part: true, commits: [3.json]} +/// Node 3: {checkpoint_version: 4, checkpoint_files: [4.checkpoint.parquet], commits: [5.json]} +/// +/// 3. Scenario with multiple checkpoints for the same version: +/// Files: +/// 00000000000000000000.json +/// 00000000000000000001.checkpoint.00000000001.00000000002.parquet +/// 00000000000000000001.checkpoint.00000000002.00000000002.parquet +/// 00000000000000000001.checkpoint.00000000001.00000000003.parquet +/// 00000000000000000001.checkpoint.00000000002.00000000003.parquet +/// 00000000000000000001.checkpoint.00000000003.00000000003.parquet +/// 00000000000000000002.json +/// +/// Resulting node: +/// Node 1: { +/// checkpoint_version: 1, +/// checkpoint_files: [1.checkpoint.00000000001.00000000003.parquet, 1.checkpoint.00000000002.00000000003.parquet, 1.checkpoint.00000000003.00000000003.parquet], +/// other_multipart_checkpoints: [[1.checkpoint.00000000001.00000000002.parquet, 1.checkpoint.00000000002.00000000002.parquet]], +/// multi_part: true, +/// commits: [2.json] +/// } + +#[derive(Clone)] +pub struct CheckpointNode { + pub checkpoint_version: Option, + pub checkpoint_files: Option>, + pub other_multipart_checkpoints: Vec>, + pub multi_part: bool, + pub commits: Vec, + pub next: Option>>, +} + +pub struct DeltaLogGroupingIterator { + pub head: Option>>, + pub current: Option>>, +} + +impl DeltaLogGroupingIterator { + pub fn new(files: Vec, beginning: Option) -> DeltaResult { + // Sort files by version and type (checkpoints before commits) + let mut versioned_files: Vec<(u64, FileMeta)> = files + .into_iter() + .filter_map(|file| { + let log_path = LogPath::new(&file.location); + log_path.version.map(|v| (v, file)) + }) + .collect(); + + // Sort files: first by version, then by type (checkpoints before commits) + // This ensures that for each version, we process checkpoints before commits + versioned_files.sort_unstable_by(|(v1, f1), (v2, f2)| { + v1.cmp(v2).then_with(|| { + let is_checkpoint1 = LogPath::new(&f1.location).is_checkpoint; + let is_checkpoint2 = LogPath::new(&f2.location).is_checkpoint; + is_checkpoint2.cmp(&is_checkpoint1) // Checkpoints before commits + }) + }); + + // Initialize variables for building the linked list of nodes + let mut current_version: Option = None; + let mut head_node: Option>> = None; + let mut last_node: Option>> = None; + + // Temporary storage for building the current node + let mut current_node = CheckpointNode { + checkpoint_version: None, + checkpoint_files: None, + other_multipart_checkpoints: Vec::new(), + multi_part: false, + commits: Vec::new(), + next: None, + }; + + let mut iter = versioned_files.into_iter().peekable(); + + while let Some((file_version, file_meta)) = iter.next() { + let log_path = LogPath::new(&file_meta.location); + + // Handle version gaps and ensure we start from version 0 if beginning is true + match current_version { + Some(v) if v + 1 < file_version => { + return Err(Error::InvalidDeltaLog(format!( + "Version gap detected between versions {} and {}", + v, file_version + ))); + } + None if file_version > 0 && beginning == Some(true) => { + return Err(Error::InvalidDeltaLog(format!( + "Missing commits before version {}", + file_version + ))); + } + _ => (), + } + + current_version = Some(file_version); + + if log_path.is_checkpoint { + // Finalize the current node if it contains data and start a new one + // This happens when we encounter a new checkpoint + if current_node.checkpoint_version.is_some() || !current_node.commits.is_empty() { + let new_node = Rc::new(RefCell::new(CheckpointNode { + checkpoint_version: current_node.checkpoint_version, + checkpoint_files: current_node.checkpoint_files, + other_multipart_checkpoints: current_node.other_multipart_checkpoints, + multi_part: current_node.multi_part, + commits: current_node.commits, + next: None, + })); + + // Link the new node to the previous one + if let Some(ref last) = last_node { + last.borrow_mut().next = Some(Rc::clone(&new_node)); + } else { + head_node = Some(Rc::clone(&new_node)); + } + + last_node = Some(new_node); + + // Reset current node for the new checkpoint + current_node = CheckpointNode { + checkpoint_version: None, + checkpoint_files: None, + other_multipart_checkpoints: Vec::new(), + multi_part: false, + commits: Vec::new(), + next: None, + }; + } + + // Start a new node with this checkpoint + current_node.checkpoint_version = Some(file_version); + + if log_path.is_multi_part_checkpoint() { + // Handle multi-part checkpoints + current_node.multi_part = true; + let mut parts = vec![file_meta.clone()]; + + // Extract expected number of parts + let (_, num_parts) = log_path.get_checkpoint_part_numbers().unwrap(); + + // Collect remaining parts of the multi-part checkpoint + for _ in 1..num_parts { + if let Some((next_version, next_file_meta)) = iter.peek() { + let next_log_path = LogPath::new(&next_file_meta.location); + if *next_version == file_version && next_log_path.is_checkpoint { + parts.push(next_file_meta.clone()); + iter.next(); // Consume the iterator + } else { + return Err(Error::InvalidDeltaLog(format!( + "Incomplete multi-part checkpoint at version {}", + file_version + ))); + } + } else { + return Err(Error::InvalidDeltaLog(format!( + "Incomplete multi-part checkpoint at version {}", + file_version + ))); + } + } + + current_node.checkpoint_files = Some(parts); + } else { + // Handle single-part checkpoint + current_node.checkpoint_files = Some(vec![file_meta.clone()]); + current_node.multi_part = false; + } + } else if log_path.is_commit { + // Add commit file to the current node + current_node.commits.push(file_meta.clone()); + } else { + // Skip unknown file types + continue; + } + } + + // Finalize the last node + if current_node.checkpoint_version.is_some() || !current_node.commits.is_empty() { + let new_node = Rc::new(RefCell::new(CheckpointNode { + checkpoint_version: current_node.checkpoint_version, + checkpoint_files: current_node.checkpoint_files, + other_multipart_checkpoints: current_node.other_multipart_checkpoints, + multi_part: current_node.multi_part, + commits: current_node.commits, + next: None, + })); + + if let Some(ref last) = last_node { + last.borrow_mut().next = Some(Rc::clone(&new_node)); + } else { + head_node = Some(Rc::clone(&new_node)); + } + } + + Ok(DeltaLogGroupingIterator { + head: head_node, + current: None, + }) + } +} + +// Implement Iterator for DeltaLogGroupingIterator +impl Iterator for DeltaLogGroupingIterator { + type Item = Rc>; + + fn next(&mut self) -> Option { + if self.current.is_none() { + self.current = self.head.clone(); + } else { + let next_node = self.current.as_ref().and_then(|node| node.borrow().next.clone()); + self.current = next_node; + } + + self.current.clone() + } +} diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 276efd32d..4d51a7276 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -67,6 +67,7 @@ pub(crate) mod path; pub mod scan; pub mod schema; pub mod snapshot; +pub mod group_iterator; pub mod table; pub mod transaction; pub(crate) mod utils; diff --git a/kernel/src/path.rs b/kernel/src/path.rs index 5754617a3..d9ce41588 100644 --- a/kernel/src/path.rs +++ b/kernel/src/path.rs @@ -120,6 +120,70 @@ impl<'a> LogPath<'a> { } }) } + + /// Determines if the file is a multi-part checkpoint. + /// + /// Sample checkpoint files: + /// - Single-part: "00000000000000000010.checkpoint.parquet" + /// - Multi-part: "00000000000000000010.checkpoint.0000000001.0000000003.parquet" + /// + /// For the multi-part example: + /// - "00000000000000000010" is the version + /// - "checkpoint" indicates it's a checkpoint file + /// - "0000000001" is the part number + /// - "0000000003" is the total number of parts + /// - "parquet" is the file extension + pub(crate) fn is_multi_part_checkpoint(&self) -> bool { + // If it's not a checkpoint at all, it can't be a multi-part checkpoint + if !self.is_checkpoint { + return false; + } + + self.filename + .and_then(|f| f.split_once(".checkpoint.")) + // After splitting at ".checkpoint.", we focus on the part after it + .map(|(_, rest)| { + // Count the number of parts after ".checkpoint." + // A multi-part checkpoint should have 3 parts: + // 1. Part number + // 2. Total number of parts + // 3. File extension (e.g., "parquet") + rest.split('.').count() == 3 + }) + .unwrap_or(false) + } + + /// Extracts the part number and total number of parts for a multi-part checkpoint. + /// + /// For a multi-part checkpoint file like "00000000000000000010.checkpoint.0000000001.0000000003.parquet": + /// - This method would return `Some((1, 3))`, indicating it's part 1 of 3. + /// + /// Returns `None` for single-part checkpoints or non-checkpoint files. + pub(crate) fn get_checkpoint_part_numbers(&self) -> Option<(u64, u64)> { + // First, check if it's a multi-part checkpoint + if !self.is_multi_part_checkpoint() { + return None; + } + + // Split the filename into parts + let parts: Vec<&str> = self.filename?.split('.').collect(); + + // A valid multi-part checkpoint filename should have 5 parts: + // 1. Version + // 2. "checkpoint" + // 3. Part number + // 4. Total number of parts + // 5. File extension + if parts.len() != 5 { + return None; + } + + // Parse the part number (index 2) and total number (index 3) into u64 integers + let part = parts[2].parse().ok()?; + let total = parts[3].parse().ok()?; + + Some((part, total)) + } } impl<'a> AsRef for LogPath<'a> { diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index ed225ae70..57bfb391d 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -1,8 +1,6 @@ //! In-memory representation of snapshots of tables (snapshot is a table at given point in time, it //! has schema etc.) //! - -use std::cmp::Ordering; use std::sync::Arc; use itertools::Itertools; @@ -12,7 +10,8 @@ use url::Url; use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; use crate::features::{ColumnMappingMode, COLUMN_MAPPING_MODE_KEY}; -use crate::path::{version_from_location, LogPath}; +use crate::group_iterator::DeltaLogGroupingIterator; +use crate::path::LogPath; use crate::scan::ScanBuilder; use crate::schema::{Schema, SchemaRef}; use crate::utils::require; @@ -130,6 +129,48 @@ impl std::fmt::Debug for Snapshot { } } +/* +Reasoning and Example: + +Consider a Delta table with the following log structure: +Version | File +--------|--------------------- +0 | 00000.json +1 | 00001.json +2 | 00002.json +3 | 00003.json +4 | 00004.checkpoint.parquet +4 | 00004.json +5 | 00005.json +6 | 00006.json +7 | 00007.json +8 | 00008.json +9 | 00009.checkpoint.parquet +9 | 00009.json +10 | 00010.json +11 | 00011.json +12 | 00012.json + +1. If requested_version is None (latest version): + - We include all commit files after the last checkpoint (9). + - Result: [12, 11, 10; 9] + +2. If requested_version is 11: + - We include commit files after the last checkpoint (9) up to and including 11. + - Result: [11, 10; 9] + +3. If requested_version is 9 (same as checkpoint): + - We include both the checkpoint file and the commit file at version 9. + - Result: [9; 9] + +4. If requested_version is 7 (before the last checkpoint): + - We include commit files after the previous checkpoint (4) up to and including 7. + - Result: [7, 6, 5; 4] + +5. If requested_version is 3 (before any checkpoint): + - We include all commit files up to and including 3. + - Result: [3, 2, 1, 0; ] +*/ impl Snapshot { /// Create a new [`Snapshot`] instance for the given version. /// @@ -147,27 +188,12 @@ impl Snapshot { let log_url = LogPath::new(&table_root).child("_delta_log/").unwrap(); // List relevant files from log - let (mut commit_files, checkpoint_files) = - match (read_last_checkpoint(fs_client.as_ref(), &log_url)?, version) { - (Some(cp), None) => { - list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)? - } - (Some(cp), Some(version)) if cp.version >= version => { - list_log_files_with_checkpoint(&cp, fs_client.as_ref(), &log_url)? - } - _ => list_log_files(fs_client.as_ref(), &log_url)?, - }; + let (commit_files, checkpoint_files) = + list_log_files(fs_client.as_ref(), &log_url, version)?; - // remove all files above requested version - if let Some(version) = version { - commit_files.retain(|meta| { - if let Some(v) = LogPath::new(&meta.location).version { - v <= version - } else { - false - } - }); - } + // print the commit_files and checkpoint_files + debug!("\n\ncommit_files_try_new: {:?}", commit_files); + debug!("checkpoint_files_try_new: {:?}\n\n", checkpoint_files); // get the effective version from chosen files let version_eff = commit_files @@ -279,6 +305,8 @@ struct CheckpointMetadata { /// The number of fragments if the last checkpoint was written in multiple parts. pub(crate) parts: Option, /// The number of bytes of the checkpoint. + /// TODO: Temporary fix, checkout this issue for full details: https://github.com/delta-incubator/delta-kernel-rs/issues/326 + #[serde(alias = "size_in_bytes")] pub(crate) size_in_bytes: Option, /// The number of AddFile actions in the checkpoint. pub(crate) num_of_add_files: Option, @@ -301,113 +329,332 @@ fn read_last_checkpoint( log_root: &Url, ) -> DeltaResult> { let file_path = LogPath::new(log_root).child(LAST_CHECKPOINT_FILE_NAME)?; + debug!("Reading last checkpoint from: {}", file_path); match fs_client .read_files(vec![(file_path, None)]) .and_then(|mut data| data.next().expect("read_files should return one file")) { - Ok(data) => Ok(serde_json::from_slice(&data) - .inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}")) - .ok()), + Ok(data) => { + // print the data in bytes as a string + debug!("Data: {:?}", std::str::from_utf8(&data).unwrap()); + Ok(serde_json::from_slice(&data) + .inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}")) + .ok()) + } Err(Error::FileNotFound(_)) => Ok(None), Err(err) => Err(err), } } -/// List all log files after a given checkpoint. -fn list_log_files_with_checkpoint( - cp: &CheckpointMetadata, +fn list_log_files( fs_client: &dyn FileSystemClient, log_root: &Url, + requested_version: Option, ) -> DeltaResult<(Vec, Vec)> { - let version_prefix = format!("{:020}", cp.version); - let start_from = log_root.join(&version_prefix)?; - - let files = fs_client - .list_from(&start_from)? - .collect::, Error>>()? - .into_iter() - // TODO this filters out .crc files etc which start with "." - how do we want to use these kind of files? - .filter(|f| version_from_location(&f.location).is_some()) - .collect::>(); - - let mut commit_files = files - .iter() - .filter_map(|f| { - if LogPath::new(&f.location).is_commit { - Some(f.clone()) - } else { - None + // Read the last checkpoint metadata + let last_checkpoint = read_last_checkpoint(fs_client, log_root)?; + + // Determine the start version and whether to start from the beginning + let (start_version, from_beginning) = match (last_checkpoint, requested_version) { + // Case 1: Requested version is greater than the last checkpoint version + // Example: Last checkpoint version is 10, requested version is 15 + // Delta log: ..., 10.checkpoint.parquet, 11.json, 12.json, 13.json, 14.json, 15.json + // We start from the last checkpoint (version 10) and read up to version 15 + (Some(lc), Some(rv)) if rv > lc.version => (Some(lc.version), None), + + // Case 2: No specific version requested, use the latest version + // Example: Last checkpoint version is 20 + // Delta log: ..., 20.checkpoint.parquet, 21.json, 22.json, 23.json + // We start from the last checkpoint (version 20) and read up to the latest version + (Some(lc), None) => (Some(lc.version), None), + + // Case 3: Requested version is less than or equal to the last checkpoint version, + // or there is no last checkpoint + // Example 1: Last checkpoint version is 30, requested version is 25 + // Delta log: ..., 24.json, 25.json, 26.json, ..., 30.checkpoint.parquet, ... + // We start from the beginning (version 0) to ensure we capture all necessary data + // Example 2: No last checkpoint, requested version is 5 + // Delta log: 0.json, 1.json, 2.json, 3.json, 4.json, 5.json + // We start from the beginning (version 0) + _ => (Some(0), Some(true)), + }; + + let files: Vec = fs_client + .list_from(log_root)? + .filter_map(|f| f.ok()) + .filter(|f| LogPath::new(&f.location).version.map_or(true, |v| v >= start_version.unwrap())) + .collect(); + + let mut iterator = DeltaLogGroupingIterator::new(files, from_beginning)?; + let first_node = iterator.next(); + + // Sample structure of the first node: + // Note: The first node never contains a checkpoint file or checkpoint version. + // It only contains commits up to the first checkpoint. + // CheckpointNode { + // checkpoint_version: None, + // checkpoint_files: None, + // multi_part: false, + // commits: [ + // FileMeta { location: "00000000000000000000.json", ... }, + // FileMeta { location: "00000000000000000001.json", ... }, + // FileMeta { location: "00000000000000000002.json", ... } + // ], + // next: Some(Rc>) + // } + + let (checkpoints, commits, latest_checkpoint_version) = match (first_node, requested_version) { + (None, _) => { + // Case: No nodes found in the iterator + // This could happen if the log is empty or if there's an issue with file listing + // We return an error as we can't create a valid snapshot without any version information + return Err(Error::MissingVersion); + } + (Some(node), None) => { + // Case: Latest version requested + // This case handles when we want to retrieve the most recent version of the table. + // We iterate through all nodes to find the last one, which represents the latest state. + // + // Sample data: + // Assume we have the following log structure: + // 00000000000000000000.json + // 00000000000000000001.json + // 00000000000000000002.json + // 00000000000000000003.checkpoint.parquet + // 00000000000000000004.json + // 00000000000000000005.json + // 00000000000000000006.checkpoint.parquet + // 00000000000000000007.json + // 00000000000000000008.json + // + // The iterator would produce nodes like this: + // Node 1: checkpoint_version: None, commits: [0.json, 1.json, 2.json] + // Node 2: checkpoint_version: 3, checkpoint_files: [3.checkpoint.parquet], commits: [4.json, 5.json] + // Node 3: checkpoint_version: 6, checkpoint_files: [6.checkpoint.parquet], commits: [7.json, 8.json] + // + // We iterate to the last node (Node 3 in this example) and use its data. + + let mut last_node = node; + while let Some(next_node) = iterator.next() { + last_node = next_node; } - }) - .collect_vec(); - // NOTE this will sort in reverse order - commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location)); - - let checkpoint_files = files - .iter() - .filter_map(|f| { - if LogPath::new(&f.location).is_checkpoint { - Some(f.clone()) + let node = last_node.borrow(); + ( + node.checkpoint_files + .as_ref() + .map_or_else(Vec::new, |files| files.clone()), + node.commits.clone(), + node.checkpoint_version, + ) + } + (Some(node), Some(req_version)) => { + // Case: Specific version requested + // This case handles when we want to retrieve a specific version of the table. + // We iterate through nodes until we find the appropriate checkpoint and commits. + // + // Sample Delta log structure: + // 00000000000000000000.json + // 00000000000000000001.json + // 00000000000000000002.json + // 00000000000000000003.checkpoint.parquet + // 00000000000000000004.json + // 00000000000000000005.json + // 00000000000000000006.checkpoint.parquet + // 00000000000000000007.json + // 00000000000000000008.json + // + // The iterator would produce nodes like this: + // If requested version > last checkpoint's commit version from _last_checkpoint: + // Node 1: checkpoint_version: 3, checkpoint_files: [3.checkpoint.parquet], commits: [4.json, 5.json] + // Node 2: checkpoint_version: 6, checkpoint_files: [6.checkpoint.parquet], commits: [7.json, 8.json] + // + // If requested version <= last checkpoint's commit version from _last_checkpoint: + // Node 1: checkpoint_version: None, checkpoint_files: None, commits: [0.json, 1.json, 2.json] + // Node 2: checkpoint_version: 3, checkpoint_files: [3.checkpoint.parquet], commits: [4.json, 5.json] + // Node 3: checkpoint_version: 6, checkpoint_files: [6.checkpoint.parquet], commits: [7.json, 8.json] + + // Initialize variables with the first node's data + let mut last_checkpoints = if req_version >= node.borrow().checkpoint_version.unwrap_or(0) { + // If requested version is >= first checkpoint version, use its checkpoint files + node.borrow().checkpoint_files.clone().unwrap_or_default() } else { - None - } - }) - .collect_vec(); - - // TODO raise a proper error - assert_eq!(checkpoint_files.len(), cp.parts.unwrap_or(1) as usize); - - Ok((commit_files, checkpoint_files)) -} - -/// List relevant log files. -/// -/// Relevant files are the max checkpoint found and all subsequent commits. -fn list_log_files( - fs_client: &dyn FileSystemClient, - log_root: &Url, -) -> DeltaResult<(Vec, Vec)> { - let version_prefix = format!("{:020}", 0); - let start_from = log_root.join(&version_prefix)?; - - let mut max_checkpoint_version = -1_i64; - let mut commit_files = Vec::new(); - let mut checkpoint_files = Vec::with_capacity(10); - - for maybe_meta in fs_client.list_from(&start_from)? { - let meta = maybe_meta?; - let log_path = LogPath::new(&meta.location); - if log_path.is_checkpoint { - let version = log_path.version.unwrap_or(0) as i64; - match version.cmp(&max_checkpoint_version) { - Ordering::Greater => { - max_checkpoint_version = version; - checkpoint_files.clear(); - checkpoint_files.push(meta); - } - Ordering::Equal => { - checkpoint_files.push(meta); + Vec::new() + }; + let mut last_commits = node.borrow().commits.clone(); + let mut checkpoint_version = node.borrow().checkpoint_version; + + // Sample scenario: + // Delta log: + // 00000000000000000000.json + // 00000000000000000001.json + // 00000000000000000002.checkpoint.parquet + // 00000000000000000003.json + // 00000000000000000004.json + // 00000000000000000005.checkpoint.parquet + // 00000000000000000006.json + // 00000000000000000007.json + // + // If req_version is 4: + // 1st iteration: node.checkpoint_version = 2, update last_checkpoints, last_commits, checkpoint_version + // 2nd iteration: node.checkpoint_version = 5, break the loop (5 > 4) + // Result: last_checkpoints = [2.checkpoint.parquet], last_commits = [3.json, 4.json], checkpoint_version = 2 + while let Some(next_node) = iterator.next() { + let node = next_node.borrow(); + if node.checkpoint_version.is_some() && node.checkpoint_version.unwrap() > req_version { + break; } - _ => {} + last_checkpoints = node.checkpoint_files.clone().unwrap_or_default(); + last_commits = node.commits.clone(); + checkpoint_version = node.checkpoint_version; } - } else if log_path.is_commit { - commit_files.push(meta); + (last_checkpoints, last_commits, checkpoint_version) } - } - - commit_files.retain(|f| { - version_from_location(&f.location).unwrap_or(0) as i64 > max_checkpoint_version - }); - // NOTE this will sort in reverse order - commit_files.sort_unstable_by(|a, b| b.location.cmp(&a.location)); + }; + + let (commit_files, selected_checkpoint_files) = match (latest_checkpoint_version, requested_version) { + (None, None) | (None, Some(_)) => { + // This case handles scenarios where there's no checkpoint or the checkpoint version is unknown + // Example scenario: As you can see below, there's no checkpoint in the delta log. + // Delta log: + // 00000000000000000000.json + // 00000000000000000001.json + // 00000000000000000002.json + // 00000000000000000003.json + + // Even if checkpoint_version is None, we still need to have atleast one commit. + if commits.is_empty() { + return Err(Error::MissingVersion); + } + let mut filtered_commits = commits; + // Sort commits in descending order by location (which corresponds to version) + filtered_commits.sort_unstable_by(|a, b| b.location.cmp(&a.location)); + + if let Some(req_version) = requested_version { + // If a specific version is requested, filter commits + // Example: If req_version is 2, we'd keep 00000000000000000000.json, + // 00000000000000000001.json, and 00000000000000000002.json + filtered_commits.retain(|commit| { + LogPath::new(&commit.location) + .version + .map_or(false, |version| version <= req_version) + }); + } + // Return filtered commits and an empty vector for checkpoints + (filtered_commits, Vec::new()) + } + (Some(_), None) => { + // Sort commits in descending order (newest first) + let mut sorted_commits = commits; + sorted_commits.sort_unstable_by(|a, b| b.location.cmp(&a.location)); + + // Note: The node commits also contain the commit which is at the same version as the checkpoint. + // We need to remove this duplicate commit unless the last commit is equal to the last checkpoint version. + // + // Sample log: + // 00000000000000000000.json + // 00000000000000000001.json + // 00000000000000000002.json + // 00000000000000000003.checkpoint.parquet + // 00000000000000000003.json <-- This commit is at the same version as the checkpoint + // 00000000000000000004.json + // 00000000000000000005.json + // + // In this case, we would remove 00000000000000000003.json because: + // 1. It's redundant with the checkpoint file at version 3 + // 2. It's not the last commit (version 5 is the last) + // + // However, if the log ended at version 3: + // 00000000000000000003.checkpoint.parquet + // 00000000000000000003.json + // + // We would keep 00000000000000000003.json because it's the last commit, + // ensuring we don't lose any potential information not captured in the checkpoint. + + if sorted_commits.len() > 1 { + sorted_commits.pop(); + } + + // Return sorted commits (excluding the oldest) and all checkpoints + (sorted_commits, checkpoints) + } + (Some(checkpoint_version), Some(req_version)) => { + // Sample Delta log: + // 00000000000000000000.json + // 00000000000000000001.json + // 00000000000000000002.json + // 00000000000000000003.checkpoint.parquet + // 00000000000000000004.json + // 00000000000000000005.json + // 00000000000000000006.checkpoint.parquet + // 00000000000000000007.json + // 00000000000000000008.json + + // Filter commits up to and including the requested version + // Scenarios based on the sample log: + // 1. If req_version is 2: + // - Before filter: [2.json, 1.json, 0.json] + // - After filter: [2.json, 1.json, 0.json] + // - (no checkpoint version yet) + // 2. If req_version is 5: + // - Before filter: [5.json, 4.json, 3.json] + // - After filter: [5.json, 4.json] + // - (checkpoint version 3) + // 3. If req_version is 8: + // - Before filter: [8.json, 7.json, 6.json] + // - After filter: [8.json, 7.json] + // - (checkpoint version 6) + // Note: Each scenario contains commits from one node. + // The commit equal to the checkpoint version is included in the node + // until it's popped off later in the process. + let mut filtered_commits: Vec = commits + .into_iter() + .filter(|commit| { + LogPath::new(&commit.location) + .version + .map_or(false, |version| version <= req_version) + }) + .collect(); + + // Sort commits in descending order (newest first) + filtered_commits.sort_unstable_by(|a, b| b.location.cmp(&a.location)); + + // Remove the oldest commit if it's redundant with the checkpoint + // (unless it's the last commit in the requested range) + // + // Sample log: + // 00000000000000000000.json + // 00000000000000000001.json + // 00000000000000000002.json + // 00000000000000000003.checkpoint.parquet + // 00000000000000000004.json + // 00000000000000000005.json + // 00000000000000000006.checkpoint.parquet + + // in the above log lets say that the requested version is 6, which happnens to be the last checkpoint version. + // the commits only contains 00000000000000000006.json, so we don't remove it. + // but if let's the requested version is 5, the commits contains [00000000000000000005.json, 00000000000000000004.json, 00000000000000000003.json] + // we pop off 00000000000000000003.json because it's redundant with the checkpoint at version 3 after reverse sorting. + // so the filtered commits becomes [00000000000000000005.json, 00000000000000000004.json] + + if checkpoint_version < req_version { + filtered_commits.pop(); + } + + // Return filtered commits and checkpoints + // If requested version is less than the first checkpoint, return empty checkpoint + // Example: If req_version is 2 and checkpoint_version is 3, checkpoints will be empty + (filtered_commits, if checkpoint_version <= req_version { checkpoints } else { Vec::new() }) + } + }; - Ok((commit_files, checkpoint_files)) + Ok((commit_files, selected_checkpoint_files)) } #[cfg(test)] mod tests { use super::*; + use lazy_static::lazy_static; use std::path::PathBuf; use std::sync::Arc; @@ -421,6 +668,60 @@ mod tests { use crate::engine::sync::SyncEngine; use crate::schema::StructType; + // Define a type for the test case elements + type TestCase = ( + Version, // expected + Version, // expected + Option, // expected + Vec, // expected + Vec, // expected + ); + lazy_static! { + static ref TEST_CASES: Vec = vec![ + // Version 0: No checkpoint, only the initial commit + (0, 0, None, vec![0], vec![]), + // Version 1-3: No checkpoint yet, accumulating commits + (1, 1, None, vec![1, 0], vec![]), + (2, 2, None, vec![2, 1, 0], vec![]), + (3, 3, None, vec![3, 2, 1, 0], vec![]), + // Version 4: First checkpoint, only includes its own commit + (4, 4, Some(4), vec![4], vec![4]), + // Version 5-8: After first checkpoint, accumulating new commits + (5, 5, Some(4), vec![5], vec![4]), + (6, 6, Some(4), vec![6, 5], vec![4]), + (7, 7, Some(4), vec![7, 6, 5], vec![4]), + (8, 8, Some(4), vec![8, 7, 6, 5], vec![4]), + // Version 9: Second checkpoint, only includes its own commit + (9, 9, Some(9), vec![9], vec![9]), + // Version 10-13: After second checkpoint, accumulating new commits + (10, 10, Some(9), vec![10], vec![9]), + (11, 11, Some(9), vec![11, 10], vec![9]), + (12, 12, Some(9), vec![12, 11, 10], vec![9]), + (13, 13, Some(9), vec![13, 12, 11, 10], vec![9]), + // Version 14: Third checkpoint, only includes its own commit + (14, 14, Some(14), vec![14], vec![14]), + // Version 15-18: After third checkpoint, accumulating new commits + (15, 15, Some(14), vec![15], vec![14]), + (16, 16, Some(14), vec![16, 15], vec![14]), + (17, 17, Some(14), vec![17, 16, 15], vec![14]), + (18, 18, Some(14), vec![18, 17, 16, 15], vec![14]), + // Version 19: Fourth checkpoint, only includes its own commit + (19, 19, Some(19), vec![19], vec![19]), + // Version 20-23: After fourth checkpoint, accumulating new commits + (20, 20, Some(19), vec![20], vec![19]), + (21, 21, Some(19), vec![21, 20], vec![19]), + (22, 22, Some(19), vec![22, 21, 20], vec![19]), + (23, 23, Some(19), vec![23, 22, 21, 20], vec![19]), + // Version 24: Fifth checkpoint, only includes its own commit + (24, 24, Some(24), vec![24], vec![24]), + // Version 25-28: After fifth checkpoint, accumulating new commits + (25, 25, Some(24), vec![25], vec![24]), + (26, 26, Some(24), vec![26, 25], vec![24]), + (27, 27, Some(24), vec![27, 26, 25], vec![24]), + (28, 28, Some(24), vec![28, 27, 26, 25], vec![24]), + ]; + } + #[test] fn test_snapshot_read_metadata() { let path = @@ -540,10 +841,661 @@ mod tests { LogPath::new(&snapshot.log_segment.checkpoint_files[0].location).version, Some(2) ); + debug!( + "\n\ncheckpoint_files new fail: {:?}\n\n", + snapshot.log_segment.checkpoint_files + ); + debug!( + "\n\ncommit_files new fail: {:?}\n\n", + snapshot.log_segment.commit_files + ); assert_eq!(snapshot.log_segment.commit_files.len(), 1); assert_eq!( LogPath::new(&snapshot.log_segment.commit_files[0].location).version, Some(3) ); } + + #[test] + fn test_snapshot_version_0_with_checkpoint_at_version_1() { + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/app-txn-checkpoint/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + + let engine = SyncEngine::new(); + + // First, let's verify the content of the _last_checkpoint file + let fs_client = engine.get_file_system_client(); + let log_url = LogPath::new(&url).child("_delta_log/").unwrap(); + let last_checkpoint = read_last_checkpoint(fs_client.as_ref(), &log_url).unwrap(); + + assert!( + last_checkpoint.is_some(), + "_last_checkpoint file should exist" + ); + let checkpoint_meta = last_checkpoint.unwrap(); + debug!("Checkpoint metadata: {:#?}", checkpoint_meta); + assert_eq!( + checkpoint_meta.version, 1, + "Last checkpoint should be at version 1" + ); + assert_eq!(checkpoint_meta.size, 8, "Checkpoint size should be 8"); + assert_eq!( + checkpoint_meta.size_in_bytes, + Some(21857), + "Checkpoint size in bytes should be 21857" + ); + + // Now, request snapshot at version 0 + let snapshot = Snapshot::try_new(url.clone(), &engine, Some(0)); + + match snapshot { + Ok(snap) => { + assert_eq!(snap.version(), 0, "Snapshot version should be 0"); + + // Verify that the snapshot contains the correct files + assert_eq!( + snap.log_segment.commit_files.len(), + 1, + "There should be one commit file" + ); + assert_eq!( + LogPath::new(&snap.log_segment.commit_files[0].location).version, + Some(0), + "The commit file should be version 0" + ); + + assert!( + snap.log_segment.checkpoint_files.is_empty(), + "Snapshot for version 0 should not contain checkpoint files" + ); + } + Err(e) => { + panic!("Failed to create snapshot for version 0: {:?}", e); + } + } + + // Verify the snapshot at version 1 (the checkpoint version) + let snapshot_1 = Snapshot::try_new(url, &engine, Some(1)).unwrap(); + assert_eq!(snapshot_1.version(), 1, "Snapshot version should be 1"); + assert_eq!( + snapshot_1.log_segment.checkpoint_files.len(), + 1, + "There should be one checkpoint file for version 1" + ); + assert_eq!( + LogPath::new(&snapshot_1.log_segment.checkpoint_files[0].location).version, + Some(1), + "The checkpoint file should be version 1" + ); + } + + #[test] + fn test_snapshot_with_version_less_than_latest_checkpoint() { + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + + let engine = SyncEngine::new(); + + // Attempt to create a snapshot at version 10 + let result = Snapshot::try_new(url, &engine, Some(10)); + + // Check if the operation succeeded + assert!( + result.is_ok(), + "Expected snapshot creation to succeed for version 10" + ); + + let snapshot = result.unwrap(); + + // Verify the snapshot properties + assert_eq!(snapshot.version(), 10, "Snapshot version should be 10"); + + // Verify the checkpoint files + let checkpoint_files = &snapshot.log_segment.checkpoint_files; + assert_eq!(checkpoint_files.len(), 1, "Should have one checkpoint file"); + assert_eq!( + LogPath::new(&checkpoint_files[0].location).version, + Some(9), + "Checkpoint should be version 9" + ); + + // Verify the commit files + let commit_files = &snapshot.log_segment.commit_files; + assert_eq!(commit_files.len(), 1, "Should have one commit file"); + + let commit_version = LogPath::new(&commit_files[0].location).version.unwrap(); + assert_eq!(commit_version, 10, "Commit file should be version 10"); + + // Verify that specific files are present + let file_names: Vec = checkpoint_files + .iter() + .chain(commit_files.iter()) + .map(|f| { + Path::from(f.location.path()) + .filename() + .unwrap() + .to_string() + }) + .collect(); + + assert!( + file_names.contains(&"00000000000000000009.checkpoint.parquet".to_string()), + "Checkpoint file 9 should be present" + ); + assert!( + file_names.contains(&"00000000000000000010.json".to_string()), + "Commit file 10 should be present" + ); + + // Verify that specific files are not present + assert!( + !file_names.contains(&"00000000000000000009.json".to_string()), + "Commit file 9 should not be present" + ); + assert!( + !file_names.contains(&"00000000000000000008.json".to_string()), + "Commit file 8 should not be present" + ); + assert!( + !file_names.contains(&"00000000000000000011.json".to_string()), + "Commit file 11 should not be present" + ); + assert!( + !file_names.contains(&"00000000000000000014.checkpoint.parquet".to_string()), + "Checkpoint file 14 should not be present" + ); + } + + #[test] + fn test_snapshot_with_version_after_last_checkpoint() { + // Setup + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let engine = SyncEngine::new(); + + // Attempt to create a snapshot at version 26 + let result = Snapshot::try_new(url.clone(), &engine, Some(26)); + + // Check if the operation succeeded + assert!( + result.is_ok(), + "Expected snapshot creation to succeed for version 26" + ); + + let snapshot = result.unwrap(); + + // Verify the snapshot properties + assert_eq!(snapshot.version(), 26, "Snapshot version should be 26"); + + // Verify that the commit files are correct + let commit_versions: Vec<_> = snapshot + .log_segment + .commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + + assert!( + commit_versions.contains(&26), + "Snapshot should include commit file for version 26" + ); + assert!( + commit_versions.contains(&25), + "Snapshot should include commit file for version 25" + ); + assert!( + !commit_versions.contains(&27), + "Snapshot should not include commit file for version 27" + ); + assert!( + !commit_versions.contains(&28), + "Snapshot should not include commit file for version 28" + ); + + // Verify that the checkpoint file is correct + assert_eq!( + snapshot.log_segment.checkpoint_files.len(), + 1, + "Snapshot should include one checkpoint file" + ); + assert_eq!( + LogPath::new(&snapshot.log_segment.checkpoint_files[0].location).version, + Some(24), + "Snapshot should use the checkpoint file for version 24" + ); + + // Verify that the log segment contains the correct range of files + let min_version = commit_versions.iter().min().unwrap(); + let max_version = commit_versions.iter().max().unwrap(); + assert!( + min_version >= &25, + "Minimum commit version should be at least 25" + ); + assert_eq!(max_version, &26, "Maximum commit version should be 26"); + + // Verify that the effective version is correct + assert_eq!(snapshot.version(), 26, "Effective version should be 26"); + } + + #[test] + fn test_snapshot_at_latest_checkpoint_version() { + // Setup + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let engine = SyncEngine::new(); + + // Read the last checkpoint to get its version + let fs_client = engine.get_file_system_client(); + let log_url = LogPath::new(&url).child("_delta_log/").unwrap(); + let last_checkpoint = read_last_checkpoint(fs_client.as_ref(), &log_url) + .expect("Failed to read last checkpoint") + .expect("No checkpoint found"); + + let checkpoint_version = last_checkpoint.version; + + // Attempt to create a snapshot at the checkpoint version + let result = Snapshot::try_new(url.clone(), &engine, Some(checkpoint_version)); + + // Check if the operation succeeded + assert!( + result.is_ok(), + "Expected snapshot creation to succeed for checkpoint version {}", + checkpoint_version + ); + + let snapshot = result.unwrap(); + + // Verify the snapshot properties + assert_eq!( + snapshot.version(), + checkpoint_version, + "Snapshot version should match checkpoint version" + ); + + // Verify that the checkpoint file is used + assert_eq!( + snapshot.log_segment.checkpoint_files.len(), + 1, + "Snapshot should include one checkpoint file" + ); + assert_eq!( + LogPath::new(&snapshot.log_segment.checkpoint_files[0].location).version, + Some(checkpoint_version), + "Snapshot should use the checkpoint file for version {}", + checkpoint_version + ); + + // Verify that no commit files after the checkpoint version are included + let commit_versions: Vec<_> = snapshot + .log_segment + .commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + + assert!( + commit_versions.is_empty() || commit_versions.iter().all(|&v| v <= checkpoint_version), + "Snapshot should not include commit files after checkpoint version" + ); + + // Verify that the effective version is correct + assert_eq!( + snapshot.version(), + checkpoint_version, + "Effective version should match checkpoint version" + ); + } + + /* + Reasoning and Example: + + Consider a Delta table with the following log structure: + Version | File + --------|--------------------- + 0 | 00000.json + 1 | 00001.json + 2 | 00002.json + 3 | 00003.json + 4 | 00004.checkpoint.parquet + 4 | 00004.json + 5 | 00005.json + 6 | 00006.json + 7 | 00007.json + 8 | 00008.json + 9 | 00009.checkpoint.parquet + 9 | 00009.json + 10 | 00010.json + 11 | 00011.json + 12 | 00012.json + + 1. If requested_version is None (latest version): + - We include all commit files after the last checkpoint (9). + - Result: [12, 11, 10; 9] + + 2. If requested_version is 11: + - We include commit files after the last checkpoint (9) up to and including 11. + - Result: [11, 10; 9] + + 3. If requested_version is 9 (same as checkpoint): + - We include both the checkpoint file and the commit file at version 9. + - Result: [9; 9] + + 4. If requested_version is 7 (before the last checkpoint): + - We include commit files after the previous checkpoint (4) up to and including 7. + - Result: [7, 6, 5; 4] + + 5. If requested_version is 3 (before any checkpoint): + - We include all commit files up to and including 3. + - Result: [3, 2, 1, 0; ] + */ + #[test] + fn test_snapshot_versions() { + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let engine = SyncEngine::new(); + + for ( + requested_version, + expected_version, + expected_checkpoint, + expected_commits, + expected_checkpoints, + ) in TEST_CASES.iter() + { + let snapshot = Snapshot::try_new(url.clone(), &engine, Some(*requested_version)); + + assert!( + snapshot.is_ok(), + "Failed to create snapshot for version {}: {:?}", + requested_version, + snapshot.err() + ); + let snapshot = snapshot.unwrap(); + + assert_eq!( + snapshot.version(), + *expected_version, + "For requested version {}, expected version {}, but got {}", + requested_version, + expected_version, + snapshot.version() + ); + + // Check checkpoint version + let actual_checkpoint = snapshot + .log_segment + .checkpoint_files + .first() + .and_then(|f| LogPath::new(&f.location).version); + assert_eq!( + actual_checkpoint, *expected_checkpoint, + "For version {}, expected checkpoint {:?}, but got {:?}", + requested_version, expected_checkpoint, actual_checkpoint + ); + + // Check commit files + let commit_versions: Vec<_> = snapshot + .log_segment + .commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + assert_eq!( + commit_versions, *expected_commits, + "For version {}, expected commit files {:?}, but got {:?}", + requested_version, expected_commits, commit_versions + ); + + // Check checkpoint files + let checkpoint_versions: Vec<_> = snapshot + .log_segment + .checkpoint_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + assert_eq!( + checkpoint_versions, *expected_checkpoints, + "For version {}, expected checkpoint files {:?}, but got {:?}", + requested_version, expected_checkpoints, checkpoint_versions + ); + } + } + + #[test] + fn test_list_log_files() { + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let engine = SyncEngine::new(); + let fs_client = engine.get_file_system_client(); + let log_url = LogPath::new(&url).child("_delta_log/").unwrap(); + + for ( + requested_version, + _expected_version, + _expected_checkpoint, + expected_commits, + expected_checkpoints, + ) in TEST_CASES.iter() + { + debug!( + "Testing list_log_files with requested_version: {:?}", + requested_version + ); + + let (commit_files, checkpoint_files) = + list_log_files(fs_client.as_ref(), &log_url, Some(*requested_version)).unwrap(); + + let commit_versions: Vec<_> = commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + let checkpoint_versions: Vec<_> = checkpoint_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + + assert_eq!( + commit_versions, *expected_commits, + "For requested version {:?}, expected commit versions {:?}, but got {:?}", + requested_version, expected_commits, commit_versions + ); + assert_eq!( + checkpoint_versions, *expected_checkpoints, + "For requested version {:?}, expected checkpoint versions {:?}, but got {:?}", + requested_version, expected_checkpoints, checkpoint_versions + ); + } + } + + #[test] + fn test_snapshot_latest_version() { + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let engine = SyncEngine::new(); + + // Create snapshot with None (latest version) + let snapshot = Snapshot::try_new(url.clone(), &engine, None).unwrap(); + + // Expected values + let expected_version = 28; + let expected_checkpoint_version = Some(24); + let expected_commits = vec![28, 27, 26, 25]; + let expected_checkpoints = vec![24]; + + // Assertions + assert_eq!( + snapshot.version(), + expected_version, + "Expected latest version to be {}, but got {}", + expected_version, + snapshot.version() + ); + + // Check checkpoint version + let actual_checkpoint = snapshot + .log_segment + .checkpoint_files + .first() + .and_then(|f| LogPath::new(&f.location).version); + assert_eq!( + actual_checkpoint, expected_checkpoint_version, + "Expected checkpoint version {:?}, but got {:?}", + expected_checkpoint_version, actual_checkpoint + ); + + // Check commit files + let commit_versions: Vec<_> = snapshot + .log_segment + .commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + assert_eq!( + commit_versions, expected_commits, + "Expected commit files {:?}, but got {:?}", + expected_commits, commit_versions + ); + + // Check checkpoint files + let checkpoint_versions: Vec<_> = snapshot + .log_segment + .checkpoint_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + assert_eq!( + checkpoint_versions, expected_checkpoints, + "Expected checkpoint files {:?}, but got {:?}", + expected_checkpoints, checkpoint_versions + ); + } + + #[test] + fn test_list_log_files_scenarios() { + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/multiple-checkpoint/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let log_url = LogPath::new(&url).child("_delta_log/").unwrap(); + let fs_client = SyncEngine::new().get_file_system_client(); + + // Scenario 1: requested_version is None + let (commit_files, checkpoint_files) = + list_log_files(fs_client.as_ref(), &log_url, None).unwrap(); + + debug!("Scenario 1 - commit_files: {:?}", commit_files); + debug!("Scenario 1 - checkpoint_files: {:?}", checkpoint_files); + + assert_eq!( + commit_files.len(), + 4, + "Expected 4 commit files when requested_version is None" + ); + assert_eq!( + checkpoint_files.len(), + 1, + "Expected 1 checkpoint file when requested_version is None" + ); + let commit_versions: Vec<_> = commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + assert_eq!( + commit_versions, + vec![28, 27, 26, 25], + "Expected commit versions to be [28, 27, 26, 25]" + ); + assert_eq!( + LogPath::new(&checkpoint_files[0].location).version, + Some(24), + "Expected latest checkpoint version to be 24" + ); + + // Scenario 2: requested_version is Some(Version) + let (commit_files, checkpoint_files) = + list_log_files(fs_client.as_ref(), &log_url, Some(15)).unwrap(); + + debug!("Scenario 2 - commit_files: {:?}", commit_files); + debug!("Scenario 2 - checkpoint_files: {:?}", checkpoint_files); + + assert_eq!( + commit_files.len(), + 1, + "Expected 1 commit file for version 15" + ); + assert_eq!( + checkpoint_files.len(), + 1, + "Expected 1 checkpoint file for version 15" + ); + let commit_versions: Vec<_> = commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + assert_eq!( + commit_versions, + vec![15], + "Expected commit version to be [15]" + ); + assert_eq!( + LogPath::new(&checkpoint_files[0].location).version, + Some(14), + "Expected checkpoint version to be 14" + ); + + // Additional test for Scenario 2 to ensure we don't include files after the requested version + let (commit_files, checkpoint_files) = + list_log_files(fs_client.as_ref(), &log_url, Some(22)).unwrap(); + let commit_versions: Vec<_> = commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + assert_eq!( + commit_versions, + vec![22, 21, 20], + "Expected commit versions to be [22, 21, 20] for requested version 22" + ); + assert_eq!( + LogPath::new(&checkpoint_files[0].location).version, + Some(19), + "Expected checkpoint version to be 19 for requested version 22" + ); + assert!( + !commit_versions.contains(&23), + "Should not include commit files after the requested version (22)" + ); + + // Scenario 3: requested_version is the same as a checkpoint version + let (commit_files, checkpoint_files) = + list_log_files(fs_client.as_ref(), &log_url, Some(24)).unwrap(); + + debug!("Scenario 3 - commit_files: {:?}", commit_files); + debug!("Scenario 3 - checkpoint_files: {:?}", checkpoint_files); + + let commit_versions: Vec<_> = commit_files + .iter() + .filter_map(|f| LogPath::new(&f.location).version) + .collect(); + assert_eq!( + commit_versions, + vec![24], + "Expected only commit version 24 when requested version is 24" + ); + assert_eq!( + checkpoint_files.len(), + 1, + "Expected 1 checkpoint file when requested version is 24" + ); + assert_eq!( + LogPath::new(&checkpoint_files[0].location).version, + Some(24), + "Expected checkpoint version to be 24 when requested version is 24" + ); + } } diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000000.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000000.json new file mode 100644 index 000000000..2df9e66c2 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"1ca33e8d-ad5c-4e43-bab4-0d58d74e049e","name":null,"description":null,"format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}},{\"name\":\"value\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}","partitionColumns":[],"createdTime":1725607399077,"configuration":{}}} +{"add":{"path":"part-00001-1f750a9d-670b-4e10-b70e-f4b57f4d1dcc-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399089,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"7B0GBMq\",\"id\":14},\"maxValues\":{\"id\":67,\"value\":\"rMjmkJp\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399089,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000001.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000001.json new file mode 100644 index 000000000..0e40e5c93 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-546f378e-c381-44fa-8546-0b5e0dec5d7a-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399102,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":15,\"value\":\"9ZGuiBM\"},\"maxValues\":{\"id\":80,\"value\":\"pi2j55u\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399102,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000002.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000002.json new file mode 100644 index 000000000..32f18868c --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000002.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-6be399aa-c687-4f71-819e-efef39379674-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399110,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":0,\"value\":\"HzmPdRD\"},\"maxValues\":{\"id\":73,\"value\":\"st1DuKb\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399110,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000003.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000003.json new file mode 100644 index 000000000..a6ea3cba5 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-ef4482fe-3530-4db6-bf2f-2958f61e7fc8-c000.snappy.parquet","partitionValues":{},"size":865,"modificationTime":1725607399118,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":22,\"value\":\"dfU3zNV\"},\"maxValues\":{\"value\":\"vrbg0qC\",\"id\":96},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399118,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000004.checkpoint.parquet b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000004.checkpoint.parquet new file mode 100644 index 000000000..7e4a13eb5 Binary files /dev/null and b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000004.checkpoint.parquet differ diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000004.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000004.json new file mode 100644 index 000000000..1179c23e7 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000004.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-2335a318-4612-4de7-b77e-b170ad690ed0-c000.snappy.parquet","partitionValues":{},"size":858,"modificationTime":1725607399125,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"2a0k0lm\",\"id\":19},\"maxValues\":{\"id\":47,\"value\":\"ow8MJEi\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399125,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000005.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000005.json new file mode 100644 index 000000000..d0aeb295e --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000005.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-a3bdb273-8a93-45fc-84de-166ee32deb38-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399144,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"8MHGDDV\",\"id\":22},\"maxValues\":{\"id\":62,\"value\":\"x2gaSJY\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399144,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000006.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000006.json new file mode 100644 index 000000000..7fd9279ec --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000006.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-504cf885-af24-4efd-9f32-5a1f4d2d5002-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399162,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"Dv5OQ0Z\",\"id\":32},\"maxValues\":{\"value\":\"n0fvhUM\",\"id\":80},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399162,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000007.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000007.json new file mode 100644 index 000000000..2cbedf768 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000007.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-615c5244-1b55-41f0-a26a-ef2477135e17-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399174,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":32,\"value\":\"JEBjgwD\"},\"maxValues\":{\"value\":\"p0yBYnX\",\"id\":80},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399174,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000008.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000008.json new file mode 100644 index 000000000..b2bada7d5 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000008.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-4efd4d1f-f11d-4c16-95ce-b033f70d2313-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399187,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"7wkcLTu\",\"id\":48},\"maxValues\":{\"id\":77,\"value\":\"Mm3Gaui\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399187,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000009.checkpoint.parquet b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000009.checkpoint.parquet new file mode 100644 index 000000000..66c42e178 Binary files /dev/null and b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000009.checkpoint.parquet differ diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000009.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000009.json new file mode 100644 index 000000000..40118d3ac --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000009.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-f0b51ee5-50c0-482a-8487-ab2cf0801bd1-c000.snappy.parquet","partitionValues":{},"size":863,"modificationTime":1725607399205,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"PJAk83J\",\"id\":27},\"maxValues\":{\"id\":73,\"value\":\"dRnjkHw\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399205,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000010.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000010.json new file mode 100644 index 000000000..b949b1a7b --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000010.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-13dad15a-6ba3-484e-afce-245da13e6e08-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399239,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"Go5YxhK\",\"id\":0},\"maxValues\":{\"value\":\"w5lZac0\",\"id\":92},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399239,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000011.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000011.json new file mode 100644 index 000000000..7863b0196 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000011.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-2a1049b1-5a9b-4aa0-837c-745e039f2344-c000.snappy.parquet","partitionValues":{},"size":863,"modificationTime":1725607399252,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":45,\"value\":\"JXMAsip\"},\"maxValues\":{\"id\":95,\"value\":\"rqGpUxp\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399252,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000012.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000012.json new file mode 100644 index 000000000..9584294d1 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000012.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-058ac7dd-4fac-4425-aafd-9640a3c3a0c1-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399265,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":49,\"value\":\"1M1iEwj\"},\"maxValues\":{\"id\":99,\"value\":\"s8Hyptg\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399265,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000013.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000013.json new file mode 100644 index 000000000..5cc7fa5a1 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000013.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-1201fbb6-f27f-42dd-bdbf-abfee4462030-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399278,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":17,\"value\":\"4XvK5x4\"},\"maxValues\":{\"id\":97,\"value\":\"utr4WF7\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399278,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000014.checkpoint.parquet b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000014.checkpoint.parquet new file mode 100644 index 000000000..db1556fe6 Binary files /dev/null and b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000014.checkpoint.parquet differ diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000014.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000014.json new file mode 100644 index 000000000..1bc9e8655 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000014.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-29a56f24-fd7b-4c68-af81-bbafa094a2a2-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399292,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"2jvC282\",\"id\":25},\"maxValues\":{\"id\":81,\"value\":\"eNZsxuV\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399292,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000015.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000015.json new file mode 100644 index 000000000..4ddc7a5ae --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000015.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-f13d2ab7-14b2-4db9-be61-933b5271d6ba-c000.snappy.parquet","partitionValues":{},"size":857,"modificationTime":1725607399316,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":28,\"value\":\"0v1XxX0\"},\"maxValues\":{\"id\":80,\"value\":\"AFSw4js\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399316,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000016.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000016.json new file mode 100644 index 000000000..706872547 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000016.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-a202c90c-afc4-4d29-a881-97b6653dadab-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399331,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"1nL0gwb\",\"id\":5},\"maxValues\":{\"id\":52,\"value\":\"wrjH3el\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399331,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000017.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000017.json new file mode 100644 index 000000000..ceedac96a --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000017.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-0437a982-4a21-4a30-927e-40173075a094-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399345,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":7,\"value\":\"AJA1U8a\"},\"maxValues\":{\"id\":80,\"value\":\"raZc9TO\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399345,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000018.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000018.json new file mode 100644 index 000000000..4be199452 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000018.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-1b27a29e-b5ee-4a40-9064-97e9df0a44b4-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399359,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":61,\"value\":\"QCCtHNe\"},\"maxValues\":{\"id\":79,\"value\":\"tWjDUs8\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399359,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000019.checkpoint.parquet b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000019.checkpoint.parquet new file mode 100644 index 000000000..929a45242 Binary files /dev/null and b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000019.checkpoint.parquet differ diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000019.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000019.json new file mode 100644 index 000000000..4a5bcad17 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000019.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-f6faf8c9-cd66-4948-a724-f6f4176ba789-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399374,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"CRJ6s62\",\"id\":9},\"maxValues\":{\"value\":\"XRHalQO\",\"id\":87},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399375,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000020.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000020.json new file mode 100644 index 000000000..d2b4f66d3 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000020.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-726427fb-de25-4d38-93e5-dfd155466a2b-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399403,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":1,\"value\":\"0ZhgTS5\"},\"maxValues\":{\"id\":64,\"value\":\"nu4zxE3\"},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399403,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000021.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000021.json new file mode 100644 index 000000000..44a4e8ec6 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000021.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-e3fae9af-7a59-4e55-aab8-362763691241-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399418,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":14,\"value\":\"GBpgZp7\"},\"maxValues\":{\"value\":\"xxmcmvD\",\"id\":40},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399418,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000022.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000022.json new file mode 100644 index 000000000..e51d6fb51 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000022.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-365dc1b2-11d4-4e93-90a0-7106c28ea99a-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399432,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":13,\"value\":\"BFkRayL\"},\"maxValues\":{\"value\":\"wLx8O7D\",\"id\":82},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399432,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000023.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000023.json new file mode 100644 index 000000000..8fb7e64b9 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000023.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-8512d16a-f2b2-4dc3-bc57-32cdbe7fcc2b-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399446,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"LB1fR3b\",\"id\":23},\"maxValues\":{\"value\":\"tAMGxHJ\",\"id\":57},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399446,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000024.checkpoint.parquet b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000024.checkpoint.parquet new file mode 100644 index 000000000..2b3f30b8b Binary files /dev/null and b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000024.checkpoint.parquet differ diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000024.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000024.json new file mode 100644 index 000000000..19e116923 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000024.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-78bcb862-b023-4c57-8179-deabab296794-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399460,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"value\":\"0FI0b6R\",\"id\":6},\"maxValues\":{\"id\":44,\"value\":\"UtWe2Qf\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399460,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000025.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000025.json new file mode 100644 index 000000000..d4142530f --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000025.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-53bab6ed-4d68-496f-bfdf-8dc2139aa8ca-c000.snappy.parquet","partitionValues":{},"size":858,"modificationTime":1725607399484,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":17,\"value\":\"9hH3bKD\"},\"maxValues\":{\"value\":\"xdeMV6T\",\"id\":53},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399484,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000026.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000026.json new file mode 100644 index 000000000..e382c6c5b --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000026.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-2e2f534b-df31-4d91-a09a-1db90326b17e-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399499,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":67,\"value\":\"Is0rgSl\"},\"maxValues\":{\"value\":\"yn3arQB\",\"id\":98},\"nullCount\":{\"id\":0,\"value\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399499,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000027.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000027.json new file mode 100644 index 000000000..0c7bed8c3 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000027.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-e3f6f95a-f708-458e-ad96-a6b882b90463-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399518,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":22,\"value\":\"3sW2zew\"},\"maxValues\":{\"id\":83,\"value\":\"QHUyuHt\"},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399518,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000028.json b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000028.json new file mode 100644 index 000000000..108953f61 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/00000000000000000028.json @@ -0,0 +1,2 @@ +{"add":{"path":"part-00001-70de81f0-892b-4aca-b5da-aff2b97a93f5-c000.snappy.parquet","partitionValues":{},"size":864,"modificationTime":1725607399535,"dataChange":true,"stats":"{\"numRecords\":4,\"minValues\":{\"id\":46,\"value\":\"2161Bho\"},\"maxValues\":{\"value\":\"ay9xwjj\",\"id\":77},\"nullCount\":{\"value\":0,\"id\":0}}","tags":null,"deletionVector":null,"baseRowId":null,"defaultRowCommitVersion":null,"clusteringProvider":null}} +{"commitInfo":{"timestamp":1725607399535,"operation":"WRITE","operationParameters":{"mode":"Append"},"clientVersion":"delta-rs.0.19.0"}} \ No newline at end of file diff --git a/kernel/tests/data/multiple-checkpoint/_delta_log/_last_checkpoint b/kernel/tests/data/multiple-checkpoint/_delta_log/_last_checkpoint new file mode 100644 index 000000000..a66d17469 --- /dev/null +++ b/kernel/tests/data/multiple-checkpoint/_delta_log/_last_checkpoint @@ -0,0 +1 @@ +{"size":27,"size_in_bytes":23180,"version":24} \ No newline at end of file