Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Streaming iterator to fix the snapshot creation issue #515

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -20,35 +20,50 @@ pub struct VersionGroup<L: AsUrl = FileMeta> {
files: Vec<ParsedLogPath<L>>,
}

// Files are implementation of an Iterator that yields ParsedLogPath
// So it can be any type that implements Iterator<Item = ParsedLogPath<L>>
// Hence their size is not known at compile time, so we use a Box<dyn Iterator<Item = ParsedLogPath<L>>>
pub struct VersionGroupingIterator<L: AsUrl = FileMeta> {
files: Peekable<Box<dyn Iterator<Item = ParsedLogPath<L>>>>,
// check if the given version of the snapshot contains checkpoint file
impl<L: AsUrl> VersionGroup<L> {
pub fn contains_checkpoint(&self) -> bool {
self.files.iter().any(|f| f.is_checkpoint())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aside: It's a little brittle if the set of recognized/kept files changes, but the checkpoint is always the first out of the known file types. So we can do:

Suggested change
self.files.iter().any(|f| f.is_checkpoint())
self.files.first().is_some_and(|f| f.is_checkpoint())

(but the iterator will anyway terminate on the first entry so this is probably over-optimizing)

}
}
// VersionGroupingIterator takes two type parameters:
// I: The concrete iterator type that yields ParsedLogPath<L>
// L: The type implementing AsUrl that represents the underlying file location
// This allows for flexible iteration over log files while maintaining type safety
pub struct VersionGroupingIterator<I, L>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on usage, it may make sense to do <L, I> if L ever needs to be specified directly. We can always change it later as the need arises tho.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting... rust seems able to infer generic parameters regardless of the order they're declared in? The unit tests already do e.g.

let mut iter: VersionGroupingIterator<Url> = VersionGroupingIterator::from(paths.into_iter());

where
L: AsUrl,
I: Iterator<Item = ParsedLogPath<L>>,
{
files: Peekable<I>,
}

// We use a type conversion to allow the caller to pass any iterator that yields ParsedLogPath
// This gives an advantage to group files by version in a streaming fashion if we can assume that
// the input iterator is already sorted by version, like an S3 listing of delta log files.
impl<T, L> From<T> for VersionGroupingIterator<L>
impl<I, L> From<I> for VersionGroupingIterator<I,L>
where
L: AsUrl + 'static,
T: Iterator<Item = ParsedLogPath<L>> + 'static,
L: AsUrl,
I: Iterator<Item = ParsedLogPath<L>>,
{
fn from(value: T) -> Self {
let files: Box<dyn Iterator<Item = ParsedLogPath<L>>> = Box::new(value);
fn from(value: I) -> Self {
let files = value;
VersionGroupingIterator { files: files.peekable() }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let files = value;
VersionGroupingIterator { files: files.peekable() }
VersionGroupingIterator { files: value.peekable() }

or

Suggested change
let files = value;
VersionGroupingIterator { files: files.peekable() }
let files = value.peekable();
VersionGroupingIterator { files }

}
}

// By assuming that the input iterator is already sorted by version, we can group the files by version in a streaming fashion
// This assuming is very important, if the input is not sorted by version, the grouping will not be correct
impl<L: AsUrl> Iterator for VersionGroupingIterator<L> {
impl<I, L> Iterator for VersionGroupingIterator<I, L>
where
L: AsUrl,
I: Iterator<Item = ParsedLogPath<L>>,
{
type Item = VersionGroup<L>;

fn next(&mut self) -> Option<VersionGroup<L>> {
fn next(&mut self) -> Option<Self::Item> {
while let Some(logpath) = self.files.next() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't actually a loop.. it executes at most once. Which means we can simplify (un-indent) the code by:

        let logpath = self.files.next()?;
        let version = logpath.version;
          ...
        Some(VersionGroup { version, files })

let version: u64 = logpath.version;
let version = logpath.version;
let mut files = vec![logpath];
// this is where we look ahead for the next file and check if it has the same version
// if it does, we add it to the current group
Expand Down Expand Up @@ -108,7 +123,6 @@ mod tests {
ParsedLogPath::try_from(url).unwrap().unwrap()
}

#[test]
/// Tests the basic functionality of VersionGroupingIterator with a single commit file
///
/// This test verifies that:
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ pub mod error;
pub mod expressions;
pub(crate) mod predicates;
pub mod table_features;
pub mod grouping_iterator;
pub mod grouping_iterators;

#[cfg(feature = "developer-visibility")]
pub mod path;
Expand Down
Loading