-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Streaming iterator to fix the snapshot creation issue #515
base: main
Are you sure you want to change the base?
Changes from 3 commits
84399e8
cb6effa
5288996
539617a
084e4d5
152697c
71734e9
353da58
f7d73af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -20,35 +20,50 @@ pub struct VersionGroup<L: AsUrl = FileMeta> { | |||||||||||||||
files: Vec<ParsedLogPath<L>>, | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// Files are implementation of an Iterator that yields ParsedLogPath | ||||||||||||||||
// So it can be any type that implements Iterator<Item = ParsedLogPath<L>> | ||||||||||||||||
// Hence their size is not known at compile time, so we use a Box<dyn Iterator<Item = ParsedLogPath<L>>> | ||||||||||||||||
pub struct VersionGroupingIterator<L: AsUrl = FileMeta> { | ||||||||||||||||
files: Peekable<Box<dyn Iterator<Item = ParsedLogPath<L>>>>, | ||||||||||||||||
// check if the given version of the snapshot contains checkpoint file | ||||||||||||||||
impl<L: AsUrl> VersionGroup<L> { | ||||||||||||||||
pub fn contains_checkpoint(&self) -> bool { | ||||||||||||||||
self.files.iter().any(|f| f.is_checkpoint()) | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
// VersionGroupingIterator takes two type parameters: | ||||||||||||||||
// I: The concrete iterator type that yields ParsedLogPath<L> | ||||||||||||||||
// L: The type implementing AsUrl that represents the underlying file location | ||||||||||||||||
// This allows for flexible iteration over log files while maintaining type safety | ||||||||||||||||
pub struct VersionGroupingIterator<I, L> | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Depending on usage, it may make sense to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting... rust seems able to infer generic parameters regardless of the order they're declared in? The unit tests already do e.g. let mut iter: VersionGroupingIterator<Url> = VersionGroupingIterator::from(paths.into_iter()); |
||||||||||||||||
where | ||||||||||||||||
L: AsUrl, | ||||||||||||||||
I: Iterator<Item = ParsedLogPath<L>>, | ||||||||||||||||
{ | ||||||||||||||||
files: Peekable<I>, | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// We use a type conversion to allow the caller to pass any iterator that yields ParsedLogPath | ||||||||||||||||
// This gives an advantage to group files by version in a streaming fashion if we can assume that | ||||||||||||||||
// the input iterator is already sorted by version, like an S3 listing of delta log files. | ||||||||||||||||
impl<T, L> From<T> for VersionGroupingIterator<L> | ||||||||||||||||
impl<I, L> From<I> for VersionGroupingIterator<I,L> | ||||||||||||||||
where | ||||||||||||||||
L: AsUrl + 'static, | ||||||||||||||||
T: Iterator<Item = ParsedLogPath<L>> + 'static, | ||||||||||||||||
L: AsUrl, | ||||||||||||||||
I: Iterator<Item = ParsedLogPath<L>>, | ||||||||||||||||
{ | ||||||||||||||||
fn from(value: T) -> Self { | ||||||||||||||||
let files: Box<dyn Iterator<Item = ParsedLogPath<L>>> = Box::new(value); | ||||||||||||||||
fn from(value: I) -> Self { | ||||||||||||||||
let files = value; | ||||||||||||||||
VersionGroupingIterator { files: files.peekable() } | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
or
Suggested change
|
||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
// By assuming that the input iterator is already sorted by version, we can group the files by version in a streaming fashion | ||||||||||||||||
// This assuming is very important, if the input is not sorted by version, the grouping will not be correct | ||||||||||||||||
impl<L: AsUrl> Iterator for VersionGroupingIterator<L> { | ||||||||||||||||
impl<I, L> Iterator for VersionGroupingIterator<I, L> | ||||||||||||||||
where | ||||||||||||||||
L: AsUrl, | ||||||||||||||||
I: Iterator<Item = ParsedLogPath<L>>, | ||||||||||||||||
{ | ||||||||||||||||
type Item = VersionGroup<L>; | ||||||||||||||||
|
||||||||||||||||
fn next(&mut self) -> Option<VersionGroup<L>> { | ||||||||||||||||
fn next(&mut self) -> Option<Self::Item> { | ||||||||||||||||
while let Some(logpath) = self.files.next() { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't actually a loop.. it executes at most once. Which means we can simplify (un-indent) the code by: let logpath = self.files.next()?;
let version = logpath.version;
...
Some(VersionGroup { version, files }) |
||||||||||||||||
let version: u64 = logpath.version; | ||||||||||||||||
let version = logpath.version; | ||||||||||||||||
let mut files = vec![logpath]; | ||||||||||||||||
// this is where we look ahead for the next file and check if it has the same version | ||||||||||||||||
// if it does, we add it to the current group | ||||||||||||||||
|
@@ -108,7 +123,6 @@ mod tests { | |||||||||||||||
ParsedLogPath::try_from(url).unwrap().unwrap() | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
#[test] | ||||||||||||||||
/// Tests the basic functionality of VersionGroupingIterator with a single commit file | ||||||||||||||||
/// | ||||||||||||||||
/// This test verifies that: | ||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: It's a little brittle if the set of recognized/kept files changes, but the checkpoint is always the first out of the known file types. So we can do:
(but the iterator will anyway terminate on the first entry so this is probably over-optimizing)