diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index b9036d04e..ff335ce11 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use bytes::Bytes; use futures::stream::StreamExt; +use itertools::Itertools; use object_store::path::Path; -use object_store::DynObjectStore; +use object_store::{DynObjectStore, ObjectStore}; use url::Url; use crate::engine::default::executor::TaskExecutor; @@ -12,15 +13,22 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient}; #[derive(Debug)] pub struct ObjectStoreFileSystemClient { inner: Arc, + has_ordered_listing: bool, table_root: Path, task_executor: Arc, readahead: usize, } impl ObjectStoreFileSystemClient { - pub fn new(store: Arc, table_root: Path, task_executor: Arc) -> Self { + pub(crate) fn new( + store: Arc, + has_ordered_listing: bool, + table_root: Path, + task_executor: Arc, + ) -> Self { Self { inner: store, + has_ordered_listing, table_root, task_executor, readahead: 10, @@ -72,7 +80,14 @@ impl FileSystemClient for ObjectStoreFileSystemClient { } }); - Ok(Box::new(receiver.into_iter())) + if !self.has_ordered_listing { + // This FS doesn't return things in the order we require + let mut fms: Vec = receiver.into_iter().try_collect()?; + fms.sort_unstable(); + Ok(Box::new(fms.into_iter().map(Ok))) + } else { + Ok(Box::new(receiver.into_iter())) + } } /// Read data specified by the start and end offset from the file. @@ -144,6 +159,8 @@ mod tests { use object_store::{local::LocalFileSystem, ObjectStore}; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; + use crate::engine::default::DefaultEngine; + use crate::Engine; use itertools::Itertools; @@ -174,6 +191,7 @@ mod tests { let prefix = Path::from(url.path()); let client = ObjectStoreFileSystemClient::new( store, + false, // don't have ordered listing prefix, Arc::new(TokioBackgroundExecutor::new()), ); @@ -195,4 +213,42 @@ mod tests { assert_eq!(data[1], Bytes::from("data")); assert_eq!(data[2], Bytes::from("el-da")); } + + #[tokio::test] + async fn test_default_engine_listing() { + let tmp = tempfile::tempdir().unwrap(); + let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap(); + let data = Bytes::from("kernel-data"); + + let expected_names: Vec = (0..10) + .map(|i| format!("_delta_log/{:0>20}.json", i)) + .collect(); + + // put them in in reverse order + for name in expected_names.iter().rev() { + tmp_store + .put(&Path::from(name.as_str()), data.clone().into()) + .await + .unwrap(); + } + + let url = Url::from_directory_path(tmp.path()).unwrap(); + let store = Arc::new(LocalFileSystem::new()); + let prefix = Path::from_url_path(url.path()).expect("Couldn't get path"); + let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); + let client = engine.get_file_system_client(); + + let files = client.list_from(&Url::parse("file://").unwrap()).unwrap(); + let mut len = 0; + for (file, expected) in files.zip(expected_names.iter()) { + assert!( + file.as_ref().unwrap().location.path().ends_with(expected), + "{} does not end with {}", + file.unwrap().location.path(), + expected + ); + len += 1; + } + assert_eq!(len, 10, "list_from should have returned 10 files"); + } } diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 56d15ca86..9fa1bdb0c 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -67,9 +67,31 @@ impl DefaultEngine { /// - `table_root_path`: The root path of the table within storage. /// - `task_executor`: Used to spawn async IO tasks. See [executor::TaskExecutor]. pub fn new(store: Arc, table_root: Path, task_executor: Arc) -> Self { + // HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because + // local filesystem doesn't return a sorted list by default. Although the `object_store` + // crate explicitly says it _does not_ return a sorted listing, in practice all the cloud + // implementations actually do: + // - AWS: + // [`ListObjectsV2`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) + // states: "For general purpose buckets, ListObjectsV2 returns objects in lexicographical + // order based on their key names." (Directory buckets are out of scope for now) + // - Azure: Docs state + // [here](https://learn.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources): + // "A listing operation returns an XML response that contains all or part of the requested + // list. The operation returns entities in alphabetical order." + // - GCP: The [main](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) doc + // doesn't indicate order, but [this + // page](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) does say: "This page + // shows you how to list the [objects](https://cloud.google.com/storage/docs/objects) stored + // in your Cloud Storage buckets, which are ordered in the list lexicographically by name." + // So we just need to know if we're local and then if so, we sort the returned file list in + // `filesystem.rs` + let store_str = format!("{}", store); + let is_local = store_str.starts_with("LocalFileSystem"); Self { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), + !is_local, table_root, task_executor.clone(), )), diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 827d53db5..6829c8eca 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -50,8 +50,8 @@ rust_2021_compatibility )] -use std::ops::Range; use std::sync::Arc; +use std::{cmp::Ordering, ops::Range}; use bytes::Bytes; use url::Url; @@ -111,6 +111,18 @@ pub struct FileMeta { pub size: usize, } +impl Ord for FileMeta { + fn cmp(&self, other: &Self) -> Ordering { + self.location.cmp(&other.location) + } +} + +impl PartialOrd for FileMeta { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Trait for implementing an Expression evaluator. /// /// It contains one Expression which can be evaluated on multiple ColumnarBatches. diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 649468148..859ee8921 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -399,8 +399,15 @@ fn list_log_files_with_checkpoint( ))); } - // NOTE this will sort in reverse order - commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version)); + debug_assert!( + commit_files + .windows(2) + .all(|cfs| cfs[0].version <= cfs[1].version), + "fs_client.list_from() didn't return a sorted listing! {:?}", + commit_files + ); + // We assume listing returned ordered, we want reverse order + let commit_files = commit_files.into_iter().rev().collect(); Ok((commit_files, checkpoint_files)) } @@ -443,8 +450,16 @@ fn list_log_files( } commit_files.retain(|f| f.version as i64 > max_checkpoint_version); - // NOTE this will sort in reverse order - commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version)); + + debug_assert!( + commit_files + .windows(2) + .all(|cfs| cfs[0].version <= cfs[1].version), + "fs_client.list_from() didn't return a sorted listing! {:?}", + commit_files + ); + // We assume listing returned ordered, we want reverse order + let commit_files = commit_files.into_iter().rev().collect(); Ok((commit_files, checkpoint_files)) } @@ -523,6 +538,7 @@ mod tests { let prefix = Path::from(url.path()); let client = ObjectStoreFileSystemClient::new( store, + false, // don't have ordered listing prefix, Arc::new(TokioBackgroundExecutor::new()), ); @@ -582,6 +598,7 @@ mod tests { let client = ObjectStoreFileSystemClient::new( store, + false, // don't have ordered listing Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), ); @@ -626,6 +643,7 @@ mod tests { let client = ObjectStoreFileSystemClient::new( store, + false, // don't have ordered listing Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), );