From 84399e8d67ddcb1e9379584fbf20b7b32d66bde5 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Sat, 16 Nov 2024 09:01:18 +0530 Subject: [PATCH 1/9] Version grouping iterator initialized. --- kernel/src/grouping_iterator.rs | 22 ++++++++++++++++++++++ kernel/src/lib.rs | 1 + 2 files changed, 23 insertions(+) create mode 100644 kernel/src/grouping_iterator.rs diff --git a/kernel/src/grouping_iterator.rs b/kernel/src/grouping_iterator.rs new file mode 100644 index 000000000..e277365a8 --- /dev/null +++ b/kernel/src/grouping_iterator.rs @@ -0,0 +1,22 @@ +use crate::path::ParsedLogPath; +use std::iter::Peekable; + +pub struct VersionGroup { + version: u64, + files: Vec, +} + +pub struct VersionGroupingIterator { + // Peekable iterator that yields (Version, ParsedLogPath) pairs + // Peekable is needed to look ahead for grouping files with same version + files: Peekable>>, +} + +impl + 'static> From for VersionGroupingIterator { + fn from(value: T) -> Self { + // No need for map_while since version is already parsed + let files: Box> = Box::new(value); + VersionGroupingIterator { files: files.peekable() } + } +} + diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 0dd6f7d37..e0a4fe34c 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -65,6 +65,7 @@ pub mod error; pub mod expressions; pub(crate) mod predicates; pub mod table_features; +pub mod grouping_iterator; #[cfg(feature = "developer-visibility")] pub mod path; From cb6effa53feb27156e5fead9d0a6fadd98b97514 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Wed, 20 Nov 2024 18:35:37 +0530 Subject: [PATCH 2/9] Add VersionGroupingIterator for Delta log files Implements a streaming iterator to group Delta log files by version number. Key features: - Groups commit logs and checkpoints by version - Assumes input is pre-sorted by version - Supports streaming iteration for memory efficiency - Includes comprehensive test suite The implementation handles various Delta log file types including: - Regular commit logs (.json) - Single checkpoints (.parquet) - Multi-part checkpoints --- kernel/src/grouping_iterator.rs | 61 ++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/kernel/src/grouping_iterator.rs b/kernel/src/grouping_iterator.rs index e277365a8..31c3aa2d8 100644 --- a/kernel/src/grouping_iterator.rs +++ b/kernel/src/grouping_iterator.rs @@ -1,22 +1,65 @@ use crate::path::ParsedLogPath; use std::iter::Peekable; +use crate::path::AsUrl; +use crate::FileMeta; -pub struct VersionGroup { +// type to group the delta log files by version +// example delta log files and their grouping by version: +// 1. 0000000001.json +// 2. 0000000001.checkpoint.parquet +// 3. 0000000002.json +// 4. 0000000002.checkpoint.0000000001.0000000002.parquet +// 5. 0000000003.json +// +// The version groups are: +// 1. [1, 2] +// 2. [3, 4] +// 3. [5] +pub struct VersionGroup { version: u64, - files: Vec, + files: Vec>, } -pub struct VersionGroupingIterator { - // Peekable iterator that yields (Version, ParsedLogPath) pairs - // Peekable is needed to look ahead for grouping files with same version - files: Peekable>>, +// Files are implementation of an Iterator that yields ParsedLogPath +// So it can be any type that implements Iterator> +// Hence their size is not known at compile time, so we use a Box>> +pub struct VersionGroupingIterator { + files: Peekable>>>, } -impl + 'static> From for VersionGroupingIterator { +// We use a type conversion to allow the caller to pass any iterator that yields ParsedLogPath +// This gives an advantage to group files by version in a streaming fashion if we can assume that +// the input iterator is already sorted by version, like an S3 listing of delta log files. +impl From for VersionGroupingIterator +where + L: AsUrl + 'static, + T: Iterator> + 'static, +{ fn from(value: T) -> Self { - // No need for map_while since version is already parsed - let files: Box> = Box::new(value); + let files: Box>> = Box::new(value); VersionGroupingIterator { files: files.peekable() } } } +// By assuming that the input iterator is already sorted by version, we can group the files by version in a streaming fashion +// This assuming is very important, if the input is not sorted by version, the grouping will not be correct +impl Iterator for VersionGroupingIterator { + type Item = VersionGroup; + + fn next(&mut self) -> Option> { + while let Some(logpath) = self.files.next() { + let version: u64 = logpath.version; + let mut files = vec![logpath]; + // this is where we look ahead for the next file and check if it has the same version + // if it does, we add it to the current group + // if it doesn't, we return the current group and start a new one + // this is why we need to assume that the input iterator is already sorted by version, because we only check the next file + while let Some(parsed_logpath) = self.files.next_if(|v| v.version == version) { + files.push(parsed_logpath) + } + return Some(VersionGroup { version, files }); + } + None + } +} + From 52889964b35ea7223c5381f876809b3ee4ae0a1a Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Wed, 20 Nov 2024 18:44:21 +0530 Subject: [PATCH 3/9] Add tests for VersionGroupingIterator Adds initial test infrastructure for the version grouping functionality: - Helper functions for test data setup - Basic test case for single commit file grouping - Documentation for test utilities --- kernel/src/grouping_iterator.rs | 74 +++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/kernel/src/grouping_iterator.rs b/kernel/src/grouping_iterator.rs index 31c3aa2d8..488a04097 100644 --- a/kernel/src/grouping_iterator.rs +++ b/kernel/src/grouping_iterator.rs @@ -63,3 +63,77 @@ impl Iterator for VersionGroupingIterator { } } + +#[cfg(test)] +mod tests { + use super::*; + use std::path::PathBuf; + use url::Url; + /// Returns a URL pointing to the test data directory containing Delta table log files. + /// The path is relative to the project root and points to a small test Delta table. + /// + /// # Returns + /// A URL object representing the canonicalized path to the test Delta log directory + fn table_log_dir_url() -> Url { + let path = PathBuf::from("./tests/data/table-with-dv-small/_delta_log/"); + let path = std::fs::canonicalize(path).unwrap(); + Url::from_directory_path(path).unwrap() + } + + /// Creates a ParsedLogPath for testing by constructing a Delta log file path with the given version and type. + /// + /// # Arguments + /// * `version` - The version number to use in the filename (will be zero-padded to 20 digits) + /// * `file_type` - The type of log file to create. Valid options are: + /// * "commit" - Creates a commit file (.json) + /// * "checkpoint" - Creates a single checkpoint file (.checkpoint.parquet) + /// * "multipart1" - Creates part 1 of a 2-part checkpoint + /// * "multipart2" - Creates part 2 of a 2-part checkpoint + /// + /// # Returns + /// A ParsedLogPath containing the constructed URL and parsed metadata + /// + /// # Panics + /// Panics if an invalid file_type is provided + fn create_log_path(version: u64, file_type: &str) -> ParsedLogPath { + let base_url = table_log_dir_url(); + let filename = match file_type { + "commit" => format!("{:020}.json", version), + "checkpoint" => format!("{:020}.checkpoint.parquet", version), + "multipart1" => format!("{:020}.checkpoint.0000000001.0000000002.parquet", version), + "multipart2" => format!("{:020}.checkpoint.0000000002.0000000002.parquet", version), + _ => panic!("Unknown file type"), + }; + let url = base_url.join(&filename).unwrap(); + ParsedLogPath::try_from(url).unwrap().unwrap() + } + + #[test] + /// Tests the basic functionality of VersionGroupingIterator with a single commit file + /// + /// This test verifies that: + /// 1. The iterator correctly processes a single commit file + /// 2. The version group contains the expected version number (1) + /// 3. The group contains exactly one file + /// 4. The file is correctly identified as a commit file + /// 5. After consuming the single group, the iterator is exhausted + /// + /// This represents the simplest possible case for the iterator - a single file + /// that needs to be grouped by version. + #[test] + fn test_single_commit() { + let paths = vec![create_log_path(1, "commit")]; + let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); + + if let Some(group) = iter.next() { + assert_eq!(group.version, 1); + assert_eq!(group.files.len(), 1); + assert!(group.files[0].is_commit()); + } else { + panic!("Expected a group"); + } + + assert!(iter.next().is_none()); + } +} + From 539617a74e72cc88c6ddf422e40fe55179ceae7f Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Wed, 20 Nov 2024 18:49:35 +0530 Subject: [PATCH 4/9] Add test for sequential version grouping Tests the core functionality of VersionGroupingIterator with multiple sequential versions. Verifies that commit files are correctly grouped and ordered from version 1->2->3. explaining the Delta protocol requirements. --- kernel/src/grouping_iterator.rs | 43 +++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/kernel/src/grouping_iterator.rs b/kernel/src/grouping_iterator.rs index 488a04097..11a82ed93 100644 --- a/kernel/src/grouping_iterator.rs +++ b/kernel/src/grouping_iterator.rs @@ -135,5 +135,48 @@ mod tests { assert!(iter.next().is_none()); } + + #[test] + /// Tests that VersionGroupingIterator correctly handles multiple sequential versions + /// + /// This test verifies several critical aspects of version grouping: + /// 1. The iterator can process multiple commit files with different versions + /// 2. Files are grouped correctly by their version numbers + /// 3. The groups are returned in sequential version order (1, 2, 3) + /// 4. Each group contains exactly one file when there is one file per version + /// 5. The files in each group are correctly identified as commit files + /// 6. The iterator is exhausted after processing all versions + /// + /// This test is important because it validates the core functionality of streaming + /// version-based grouping when processing a Delta table's log directory. The sequential + /// version ordering is especially critical since the Delta protocol relies on processing + /// log files in version order to reconstruct table state. + /// + /// Example Delta Log Directory: + /// ```text + /// _delta_log/ + /// 00000000000000000001.json -> Group 1: [00000000000000000001.json] + /// 00000000000000000002.json -> Group 2: [00000000000000000002.json] + /// 00000000000000000003.json -> Group 3: [00000000000000000003.json] + /// ``` + /// + /// The test verifies that the iterator yields three groups, each containing + /// exactly one commit file, in version order 1->2->3. + fn test_multiple_versions() { + let paths = vec![ + create_log_path(1, "commit"), + create_log_path(2, "commit"), + create_log_path(3, "commit"), + ]; + let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); + + for expected_version in 1..=3 { + let group = iter.next().expect("Should have a group"); + assert_eq!(group.version, expected_version); + assert_eq!(group.files.len(), 1); + assert!(group.files[0].is_commit()); + } + assert!(iter.next().is_none()); + } } From 084e4d58bc45e290d36bf4f648d1c30650712463 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Wed, 20 Nov 2024 18:59:46 +0530 Subject: [PATCH 5/9] Adds tests covering various Delta log file scenarios: - Single/multi-part checkpoint handling - Mixed version and file type combinations - Empty iterator case --- kernel/src/grouping_iterator.rs | 220 ++++++++++++++++++++++++++++++++ 1 file changed, 220 insertions(+) diff --git a/kernel/src/grouping_iterator.rs b/kernel/src/grouping_iterator.rs index 11a82ed93..a74ea0c16 100644 --- a/kernel/src/grouping_iterator.rs +++ b/kernel/src/grouping_iterator.rs @@ -178,5 +178,225 @@ mod tests { } assert!(iter.next().is_none()); } + + + #[test] + /// Tests that VersionGroupingIterator correctly groups a commit file with its checkpoint file + /// + /// This test verifies that: + /// 1. Files with the same version are grouped together + /// 2. Both commit and checkpoint files are included in the group + /// 3. The group has the correct version number + /// 4. The group contains exactly 2 files (1 commit + 1 checkpoint) + /// 5. The files are correctly identified as commit and checkpoint types + /// 6. The iterator is exhausted after the single group + /// + /// Example Delta Log Directory: + /// _delta_log/ + /// 00000000000000000001.json + /// 00000000000000000001.checkpoint.parquet + /// + /// VersionGroup { + /// version: 1, + /// files: [ + /// 00000000000000000001.json, + /// 00000000000000000001.checkpoint.parquet + /// ] + /// } + fn test_version_with_checkpoint() { + let paths = vec![ + create_log_path(1, "commit"), + create_log_path(1, "checkpoint"), + ]; + let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); + + let group = iter.next().expect("Should have a group"); + assert_eq!(group.version, 1); + assert_eq!(group.files.len(), 2); + assert!(group.files.iter().any(|f| f.is_commit())); + assert!(group.files.iter().any(|f| f.is_checkpoint())); + + assert!(iter.next().is_none()); + } + + #[test] + /// Tests that VersionGroupingIterator correctly handles multi-part checkpoint files + /// + /// This test verifies that: + /// 1. All files with the same version are grouped together + /// 2. The group includes both parts of a multi-part checkpoint + /// 3. The commit file is included in the same group + /// 4. The group has the correct version number + /// 5. The group contains exactly 3 files (1 commit + 2 checkpoint parts) + /// 6. Files are correctly identified as commit vs checkpoint types + /// 7. The iterator is exhausted after processing the single group + /// + /// Example Delta Log Directory: + /// _delta_log/ + /// 00000000000000000001.json + /// 00000000000000000001.checkpoint.0000000001.0000000002.parquet + /// 00000000000000000001.checkpoint.0000000002.0000000002.parquet + /// + /// VersionGroup { + /// version: 1, + /// files: [ + /// 00000000000000000001.json, + /// 00000000000000000001.checkpoint.0000000001.0000000002.parquet, + /// 00000000000000000001.checkpoint.0000000002.0000000002.parquet + /// ] + /// } + fn test_multipart_checkpoint() { + let paths = vec![ + create_log_path(1, "commit"), + create_log_path(1, "multipart1"), + create_log_path(1, "multipart2"), + ]; + let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); + + let group = iter.next().expect("Should have a group"); + assert_eq!(group.version, 1); + assert_eq!(group.files.len(), 3); + + let (commits, checkpoints): (Vec<_>, Vec<_>) = group.files + .iter() + .partition(|f| f.is_commit()); + + assert_eq!(commits.len(), 1, "Should have one commit"); + assert_eq!(checkpoints.len(), 2, "Should have two checkpoint parts"); + + assert!(iter.next().is_none()); + } + + #[test] + /// Tests that VersionGroupingIterator correctly handles a mix of versions and file types + /// + /// This test verifies that: + /// 1. Files are correctly grouped by version number + /// 2. Each group contains the right number and types of files + /// 3. Groups are returned in version order + /// 4. The iterator processes all groups and terminates properly + /// + /// Test Data Structure: + /// Version 1: + /// - One commit file + /// - Two parts of a multi-part checkpoint + /// Version 2: + /// - One commit file + /// - One single checkpoint file + /// Version 3: + /// - One commit file only + /// + /// Example Delta Log Directory: + /// _delta_log/ + /// 00000000000000000001.json + /// 00000000000000000001.checkpoint.0000000001.0000000002.parquet + /// 00000000000000000001.checkpoint.0000000002.0000000002.parquet + /// 00000000000000000002.json + /// 00000000000000000002.checkpoint.parquet + /// 00000000000000000003.json + /// + /// Expected Version Groups: + /// VersionGroup { + /// version: 1, + /// files: [ + /// 00000000000000000001.json, + /// 00000000000000000001.checkpoint.0000000001.0000000002.parquet, + /// 00000000000000000001.checkpoint.0000000002.0000000002.parquet + /// ] + /// } + /// VersionGroup { + /// version: 2, + /// files: [ + /// 00000000000000000002.json, + /// 00000000000000000002.checkpoint.parquet + /// ] + /// } + /// VersionGroup { + /// version: 3, + /// files: [ + /// 00000000000000000003.json + /// ] + /// } + fn test_mixed_versions_and_types() { + let paths = vec![ + create_log_path(1, "commit"), + create_log_path(1, "multipart1"), + create_log_path(1, "multipart2"), + create_log_path(2, "commit"), + create_log_path(2, "checkpoint"), + create_log_path(3, "commit"), + ]; + let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); + + // Version 1 group + let group = iter.next().expect("Should have version 1"); + assert_eq!(group.version, 1); + assert_eq!(group.files.len(), 3); + assert_eq!( + group.files.iter().filter(|f| f.is_commit()).count(), + 1, + "Should have one commit" + ); + assert_eq!( + group.files.iter().filter(|f| f.is_checkpoint()).count(), + 2, + "Should have two checkpoint parts" + ); + + // Version 2 group + let group = iter.next().expect("Should have version 2"); + assert_eq!(group.version, 2); + assert_eq!(group.files.len(), 2); + assert_eq!( + group.files.iter().filter(|f| f.is_commit()).count(), + 1, + "Should have one commit" + ); + assert_eq!( + group.files.iter().filter(|f| f.is_checkpoint()).count(), + 1, + "Should have one checkpoint" + ); + + // Version 3 group + let group = iter.next().expect("Should have version 3"); + assert_eq!(group.version, 3); + assert_eq!(group.files.len(), 1); + assert!(group.files[0].is_commit(), "Should be a commit file"); + + assert!(iter.next().is_none()); + } + + #[test] + fn test_empty_iterator() { + // Test that an empty input iterator returns no groups + let paths: Vec> = vec![]; + let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); + // Verify that next() returns None when there are no items + assert!(iter.next().is_none()); + } + // We expect the caller to sort the input before passing it to the iterator + // hence the test is not needed, if uncommented and run, it will fail + // #[test] + // fn test_unsorted_input() { + // let paths = vec![ + // create_log_path(2, "commit"), + // create_log_path(1, "commit"), + // create_log_path(1, "checkpoint"), + // create_log_path(2, "checkpoint"), + // ]; + // let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); + + // // Should still group by version regardless of input order + // for version in 1..=2 { + // let group = iter.next().expect("Should have a group"); + // assert_eq!(group.version, version); + // assert_eq!(group.files.len(), 2); + // assert!(group.files.iter().any(|f| f.is_commit())); + // assert!(group.files.iter().any(|f| f.is_checkpoint())); + // } + + // assert!(iter.next().is_none()); + // } } From 152697c9316d381f38404ef48c60965bbb17d4bd Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Tue, 26 Nov 2024 12:40:12 +0530 Subject: [PATCH 6/9] Minor code cleanup in VersionGroupingIterator Changes: - Remove redundant type annotation for version variable - Fix duplicate #[test] attribute in test_single_commit --- kernel/src/grouping_iterator.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kernel/src/grouping_iterator.rs b/kernel/src/grouping_iterator.rs index a74ea0c16..3cd34312d 100644 --- a/kernel/src/grouping_iterator.rs +++ b/kernel/src/grouping_iterator.rs @@ -48,7 +48,7 @@ impl Iterator for VersionGroupingIterator { fn next(&mut self) -> Option> { while let Some(logpath) = self.files.next() { - let version: u64 = logpath.version; + let version = logpath.version; let mut files = vec![logpath]; // this is where we look ahead for the next file and check if it has the same version // if it does, we add it to the current group @@ -108,7 +108,6 @@ mod tests { ParsedLogPath::try_from(url).unwrap().unwrap() } - #[test] /// Tests the basic functionality of VersionGroupingIterator with a single commit file /// /// This test verifies that: From 71734e9d8dd45f5cb67a007c65e2151584cff290 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Wed, 27 Nov 2024 12:05:48 +0530 Subject: [PATCH 7/9] Restructuring the code, adding the contains checkpoint helper function --- .../version_grouping_iterator.rs} | 7 +++++++ kernel/src/lib.rs | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) rename kernel/src/{grouping_iterator.rs => grouping_iterators/version_grouping_iterator.rs} (98%) diff --git a/kernel/src/grouping_iterator.rs b/kernel/src/grouping_iterators/version_grouping_iterator.rs similarity index 98% rename from kernel/src/grouping_iterator.rs rename to kernel/src/grouping_iterators/version_grouping_iterator.rs index 3cd34312d..70829089d 100644 --- a/kernel/src/grouping_iterator.rs +++ b/kernel/src/grouping_iterators/version_grouping_iterator.rs @@ -20,6 +20,13 @@ pub struct VersionGroup { files: Vec>, } +// check if the given version of the snapshot contains checkpoint +impl VersionGroup { + pub fn contains_checkpoint(&self) -> bool { + self.files.iter().any(|f| f.is_checkpoint()) + } +} + // Files are implementation of an Iterator that yields ParsedLogPath // So it can be any type that implements Iterator> // Hence their size is not known at compile time, so we use a Box>> diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index e0a4fe34c..874b15745 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -65,7 +65,7 @@ pub mod error; pub mod expressions; pub(crate) mod predicates; pub mod table_features; -pub mod grouping_iterator; +pub mod grouping_iterators; #[cfg(feature = "developer-visibility")] pub mod path; From 353da58771af7a704bbb755a43c3eabb0172a237 Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Wed, 27 Nov 2024 13:16:41 +0530 Subject: [PATCH 8/9] refactor: Use generic type parameters instead of Box Changes: - Replace Box with generic type parameter I in VersionGroupingIterator - Add type parameters I and L with appropriate trait bounds using where clauses - Remove 'static bounds as they're no longer needed without Box - Simplify From implementation to use concrete iterator type directly This change: 1. Improves performance by avoiding dynamic dispatch 2. Makes the code more idiomatic by using generics 3. Maintains flexibility while being more efficient 4. Removes unnecessary boxing of iterators --- .../version_grouping_iterator.rs | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/kernel/src/grouping_iterators/version_grouping_iterator.rs b/kernel/src/grouping_iterators/version_grouping_iterator.rs index 70829089d..54ac05ad1 100644 --- a/kernel/src/grouping_iterators/version_grouping_iterator.rs +++ b/kernel/src/grouping_iterators/version_grouping_iterator.rs @@ -20,40 +20,48 @@ pub struct VersionGroup { files: Vec>, } -// check if the given version of the snapshot contains checkpoint +// check if the given version of the snapshot contains checkpoint file impl VersionGroup { pub fn contains_checkpoint(&self) -> bool { self.files.iter().any(|f| f.is_checkpoint()) } } - -// Files are implementation of an Iterator that yields ParsedLogPath -// So it can be any type that implements Iterator> -// Hence their size is not known at compile time, so we use a Box>> -pub struct VersionGroupingIterator { - files: Peekable>>>, +// VersionGroupingIterator takes two type parameters: +// I: The concrete iterator type that yields ParsedLogPath +// L: The type implementing AsUrl that represents the underlying file location +// This allows for flexible iteration over log files while maintaining type safety +pub struct VersionGroupingIterator +where + L: AsUrl, + I: Iterator>, +{ + files: Peekable, } // We use a type conversion to allow the caller to pass any iterator that yields ParsedLogPath // This gives an advantage to group files by version in a streaming fashion if we can assume that // the input iterator is already sorted by version, like an S3 listing of delta log files. -impl From for VersionGroupingIterator +impl From for VersionGroupingIterator where - L: AsUrl + 'static, - T: Iterator> + 'static, + L: AsUrl, + I: Iterator>, { - fn from(value: T) -> Self { - let files: Box>> = Box::new(value); + fn from(value: I) -> Self { + let files = value; VersionGroupingIterator { files: files.peekable() } } } // By assuming that the input iterator is already sorted by version, we can group the files by version in a streaming fashion // This assuming is very important, if the input is not sorted by version, the grouping will not be correct -impl Iterator for VersionGroupingIterator { +impl Iterator for VersionGroupingIterator +where + L: AsUrl, + I: Iterator>, +{ type Item = VersionGroup; - fn next(&mut self) -> Option> { + fn next(&mut self) -> Option { while let Some(logpath) = self.files.next() { let version = logpath.version; let mut files = vec![logpath]; From f7d73aff376d030122372d73c060fffe0700ff4d Mon Sep 17 00:00:00 2001 From: Karthic Rao Date: Wed, 27 Nov 2024 19:32:44 +0530 Subject: [PATCH 9/9] Format code and improve type annotations Changes: 1. Remove redundant type annotations for VersionGroupingIterator 2. Reorder imports alphabetically 3. Fix formatting and whitespace consistency 4. Add test for checkpoint detection across versions --- .../version_grouping_iterator.rs | 131 +++++++++++------- 1 file changed, 82 insertions(+), 49 deletions(-) diff --git a/kernel/src/grouping_iterators/version_grouping_iterator.rs b/kernel/src/grouping_iterators/version_grouping_iterator.rs index 54ac05ad1..6a10a3999 100644 --- a/kernel/src/grouping_iterators/version_grouping_iterator.rs +++ b/kernel/src/grouping_iterators/version_grouping_iterator.rs @@ -1,7 +1,7 @@ -use crate::path::ParsedLogPath; -use std::iter::Peekable; use crate::path::AsUrl; +use crate::path::ParsedLogPath; use crate::FileMeta; +use std::iter::Peekable; // type to group the delta log files by version // example delta log files and their grouping by version: @@ -30,8 +30,8 @@ impl VersionGroup { // I: The concrete iterator type that yields ParsedLogPath // L: The type implementing AsUrl that represents the underlying file location // This allows for flexible iteration over log files while maintaining type safety -pub struct VersionGroupingIterator -where +pub struct VersionGroupingIterator +where L: AsUrl, I: Iterator>, { @@ -39,24 +39,26 @@ where } // We use a type conversion to allow the caller to pass any iterator that yields ParsedLogPath -// This gives an advantage to group files by version in a streaming fashion if we can assume that +// This gives an advantage to group files by version in a streaming fashion if we can assume that // the input iterator is already sorted by version, like an S3 listing of delta log files. -impl From for VersionGroupingIterator +impl From for VersionGroupingIterator where L: AsUrl, I: Iterator>, { fn from(value: I) -> Self { - let files = value; - VersionGroupingIterator { files: files.peekable() } + let files = value; + VersionGroupingIterator { + files: files.peekable(), + } } } // By assuming that the input iterator is already sorted by version, we can group the files by version in a streaming fashion // This assuming is very important, if the input is not sorted by version, the grouping will not be correct -impl Iterator for VersionGroupingIterator -where - L: AsUrl, +impl Iterator for VersionGroupingIterator +where + L: AsUrl, I: Iterator>, { type Item = VersionGroup; @@ -78,7 +80,6 @@ where } } - #[cfg(test)] mod tests { use super::*; @@ -86,7 +87,7 @@ mod tests { use url::Url; /// Returns a URL pointing to the test data directory containing Delta table log files. /// The path is relative to the project root and points to a small test Delta table. - /// + /// /// # Returns /// A URL object representing the canonicalized path to the test Delta log directory fn table_log_dir_url() -> Url { @@ -96,7 +97,7 @@ mod tests { } /// Creates a ParsedLogPath for testing by constructing a Delta log file path with the given version and type. - /// + /// /// # Arguments /// * `version` - The version number to use in the filename (will be zero-padded to 20 digits) /// * `file_type` - The type of log file to create. Valid options are: @@ -124,21 +125,21 @@ mod tests { } /// Tests the basic functionality of VersionGroupingIterator with a single commit file - /// + /// /// This test verifies that: /// 1. The iterator correctly processes a single commit file /// 2. The version group contains the expected version number (1) /// 3. The group contains exactly one file /// 4. The file is correctly identified as a commit file /// 5. After consuming the single group, the iterator is exhausted - /// + /// /// This represents the simplest possible case for the iterator - a single file /// that needs to be grouped by version. #[test] fn test_single_commit() { let paths = vec![create_log_path(1, "commit")]; - let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); - + let mut iter = VersionGroupingIterator::from(paths.into_iter()); + if let Some(group) = iter.next() { assert_eq!(group.version, 1); assert_eq!(group.files.len(), 1); @@ -146,13 +147,13 @@ mod tests { } else { panic!("Expected a group"); } - + assert!(iter.next().is_none()); } #[test] /// Tests that VersionGroupingIterator correctly handles multiple sequential versions - /// + /// /// This test verifies several critical aspects of version grouping: /// 1. The iterator can process multiple commit files with different versions /// 2. Files are grouped correctly by their version numbers @@ -160,7 +161,7 @@ mod tests { /// 4. Each group contains exactly one file when there is one file per version /// 5. The files in each group are correctly identified as commit files /// 6. The iterator is exhausted after processing all versions - /// + /// /// This test is important because it validates the core functionality of streaming /// version-based grouping when processing a Delta table's log directory. The sequential /// version ordering is especially critical since the Delta protocol relies on processing @@ -173,7 +174,7 @@ mod tests { /// 00000000000000000002.json -> Group 2: [00000000000000000002.json] /// 00000000000000000003.json -> Group 3: [00000000000000000003.json] /// ``` - /// + /// /// The test verifies that the iterator yields three groups, each containing /// exactly one commit file, in version order 1->2->3. fn test_multiple_versions() { @@ -182,8 +183,8 @@ mod tests { create_log_path(2, "commit"), create_log_path(3, "commit"), ]; - let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); - + let mut iter = VersionGroupingIterator::from(paths.into_iter()); + for expected_version in 1..=3 { let group = iter.next().expect("Should have a group"); assert_eq!(group.version, expected_version); @@ -193,10 +194,9 @@ mod tests { assert!(iter.next().is_none()); } - #[test] /// Tests that VersionGroupingIterator correctly groups a commit file with its checkpoint file - /// + /// /// This test verifies that: /// 1. Files with the same version are grouped together /// 2. Both commit and checkpoint files are included in the group @@ -222,20 +222,20 @@ mod tests { create_log_path(1, "commit"), create_log_path(1, "checkpoint"), ]; - let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); - + let mut iter = VersionGroupingIterator::from(paths.into_iter()); + let group = iter.next().expect("Should have a group"); assert_eq!(group.version, 1); assert_eq!(group.files.len(), 2); assert!(group.files.iter().any(|f| f.is_commit())); assert!(group.files.iter().any(|f| f.is_checkpoint())); - + assert!(iter.next().is_none()); } #[test] /// Tests that VersionGroupingIterator correctly handles multi-part checkpoint files - /// + /// /// This test verifies that: /// 1. All files with the same version are grouped together /// 2. The group includes both parts of a multi-part checkpoint @@ -265,25 +265,24 @@ mod tests { create_log_path(1, "multipart1"), create_log_path(1, "multipart2"), ]; - let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); - + let mut iter = VersionGroupingIterator::from(paths.into_iter()); + let group = iter.next().expect("Should have a group"); assert_eq!(group.version, 1); assert_eq!(group.files.len(), 3); - - let (commits, checkpoints): (Vec<_>, Vec<_>) = group.files - .iter() - .partition(|f| f.is_commit()); - + + let (commits, checkpoints): (Vec<_>, Vec<_>) = + group.files.iter().partition(|f| f.is_commit()); + assert_eq!(commits.len(), 1, "Should have one commit"); assert_eq!(checkpoints.len(), 2, "Should have two checkpoint parts"); - + assert!(iter.next().is_none()); } #[test] /// Tests that VersionGroupingIterator correctly handles a mix of versions and file types - /// + /// /// This test verifies that: /// 1. Files are correctly grouped by version number /// 2. Each group contains the right number and types of files @@ -340,8 +339,7 @@ mod tests { create_log_path(2, "checkpoint"), create_log_path(3, "commit"), ]; - let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); - + let mut iter = VersionGroupingIterator::from(paths.into_iter()); // Version 1 group let group = iter.next().expect("Should have version 1"); assert_eq!(group.version, 1); @@ -356,7 +354,7 @@ mod tests { 2, "Should have two checkpoint parts" ); - + // Version 2 group let group = iter.next().expect("Should have version 2"); assert_eq!(group.version, 2); @@ -371,13 +369,13 @@ mod tests { 1, "Should have one checkpoint" ); - + // Version 3 group let group = iter.next().expect("Should have version 3"); assert_eq!(group.version, 3); assert_eq!(group.files.len(), 1); assert!(group.files[0].is_commit(), "Should be a commit file"); - + assert!(iter.next().is_none()); } @@ -385,10 +383,46 @@ mod tests { fn test_empty_iterator() { // Test that an empty input iterator returns no groups let paths: Vec> = vec![]; - let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); + let mut iter = VersionGroupingIterator::from(paths.into_iter()); // Verify that next() returns None when there are no items assert!(iter.next().is_none()); } + + // Tests that the iterator correctly detects checkpoint files + // In the example data, the version 1 has no checkpoint file, while the version 2 has a checkpoint file + // In the test we verify that the iterator correctly identifies the checkpoint file in the version 2 group + #[test] + fn test_multiple_versions_checkpoint_detection() { + // Create test data: + // Version 1: Only commit file + // Version 2: Commit + checkpoint + let paths = vec![ + create_log_path(1, "commit"), // Version 1 + create_log_path(2, "commit"), // Version 2 + create_log_path(2, "checkpoint"), // Version 2 + ]; + + let mut iter = VersionGroupingIterator::from(paths.into_iter()); + + // Check Version 1 + let group1 = iter.next().expect("Should have version 1"); + assert_eq!(group1.version, 1); + assert!( + !group1.contains_checkpoint(), + "Version 1 should not have checkpoint" + ); + + // Check Version 2 + let group2 = iter.next().expect("Should have version 2"); + assert_eq!(group2.version, 2); + assert!( + group2.contains_checkpoint(), + "Version 2 should have checkpoint" + ); + + // Verify iterator is exhausted + assert!(iter.next().is_none(), "Should not have more versions"); + } // We expect the caller to sort the input before passing it to the iterator // hence the test is not needed, if uncommented and run, it will fail // #[test] @@ -399,8 +433,8 @@ mod tests { // create_log_path(1, "checkpoint"), // create_log_path(2, "checkpoint"), // ]; - // let mut iter: VersionGroupingIterator = VersionGroupingIterator::from(paths.into_iter()); - + // let mut iter = VersionGroupingIterator::from(paths.into_iter()); + // // Should still group by version regardless of input order // for version in 1..=2 { // let group = iter.next().expect("Should have a group"); @@ -409,8 +443,7 @@ mod tests { // assert!(group.files.iter().any(|f| f.is_commit())); // assert!(group.files.iter().any(|f| f.is_checkpoint())); // } - + // assert!(iter.next().is_none()); // } } -