From 50ef8620ee3c7f2eecfb43d37d3bc01f05b5e16c Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Wed, 18 Sep 2024 18:05:02 -0700 Subject: [PATCH 01/11] Ensure we return a sorted listing when using a local client --- kernel/src/engine/default/filesystem.rs | 63 +++++++++++++++++++++++-- kernel/src/engine/default/mod.rs | 19 ++------ kernel/src/lib.rs | 14 +++++- kernel/src/snapshot.rs | 2 + 4 files changed, 80 insertions(+), 18 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index b9036d04e..3370f7eba 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use bytes::Bytes; use futures::stream::StreamExt; +use itertools::Itertools; use object_store::path::Path; -use object_store::DynObjectStore; +use object_store::{DynObjectStore, ObjectStore}; use url::Url; use crate::engine::default::executor::TaskExecutor; @@ -12,15 +13,22 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient}; #[derive(Debug)] pub struct ObjectStoreFileSystemClient { inner: Arc, + is_local: bool, table_root: Path, task_executor: Arc, readahead: usize, } impl ObjectStoreFileSystemClient { - pub fn new(store: Arc, table_root: Path, task_executor: Arc) -> Self { + pub(crate) fn new( + store: Arc, + is_local: bool, + table_root: Path, + task_executor: Arc, + ) -> Self { Self { inner: store, + is_local, table_root, task_executor, readahead: 10, @@ -72,7 +80,14 @@ impl FileSystemClient for ObjectStoreFileSystemClient { } }); - Ok(Box::new(receiver.into_iter())) + if self.is_local { + // LocalFileSystem doesn't return things in the order we require + let mut fms: Vec = receiver.into_iter().try_collect()?; + fms.sort(); + Ok(Box::new(fms.into_iter().map(|fm| Ok(fm)))) + } else { + Ok(Box::new(receiver.into_iter())) + } } /// Read data specified by the start and end offset from the file. @@ -144,6 +159,8 @@ mod tests { use object_store::{local::LocalFileSystem, ObjectStore}; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; + use crate::engine::default::DefaultEngine; + use crate::Engine; use itertools::Itertools; @@ -174,6 +191,7 @@ mod tests { let prefix = Path::from(url.path()); let client = ObjectStoreFileSystemClient::new( store, + true, prefix, Arc::new(TokioBackgroundExecutor::new()), ); @@ -195,4 +213,43 @@ mod tests { assert_eq!(data[1], Bytes::from("data")); assert_eq!(data[2], Bytes::from("el-da")); } + + #[tokio::test] + async fn test_default_engine_listing() { + let tmp = tempfile::tempdir().unwrap(); + let tmp_store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap(); + let data = Bytes::from("kernel-data"); + + let expected_names: Vec = (0..10) + .map(|i| format!("_delta_log/{:0>20}.json", i)) + .collect(); + + // put them in in reverse order + for name in expected_names.iter().rev() { + tmp_store + .put(&Path::from(name.as_str()), data.clone().into()) + .await + .unwrap(); + } + + let url = Url::from_directory_path(tmp.path()).unwrap(); + let store = Arc::new(LocalFileSystem::new()); + let prefix = Path::from(url.path()); + let store: Arc = store; + let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); + let client = engine.get_file_system_client(); + + let files = client.list_from(&Url::parse("file://").unwrap()).unwrap(); + let mut len = 0; + for (file, expected) in files.zip(expected_names.iter()) { + assert!( + file.as_ref().unwrap().location.path().ends_with(expected), + "{} does not end with {}", + file.unwrap().location.path(), + expected + ); + len += 1; + } + assert_eq!(len, 10, "list_from should have returned 10 files"); + } } diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 5a8ef7af6..77eae7025 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -51,26 +51,17 @@ impl DefaultEngine { { let (store, prefix) = parse_url_opts(path, options)?; let store = Arc::new(store); - Ok(Self { - file_system: Arc::new(ObjectStoreFileSystemClient::new( - store.clone(), - prefix, - task_executor.clone(), - )), - json: Arc::new(DefaultJsonHandler::new( - store.clone(), - task_executor.clone(), - )), - parquet: Arc::new(DefaultParquetHandler::new(store.clone(), task_executor)), - store, - expression: Arc::new(ArrowExpressionHandler {}), - }) + Ok(Self::new(store, prefix, task_executor)) } pub fn new(store: Arc, prefix: Path, task_executor: Arc) -> Self { + // HACK to check if we're using a LocalFileSystem from ObjectStore + let store_str = format!("{}", store); + let is_local = store_str.starts_with("LocalFileSystem"); Self { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), + is_local, prefix, task_executor.clone(), )), diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 72725e824..25e9c2490 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -50,8 +50,8 @@ rust_2021_compatibility )] -use std::ops::Range; use std::sync::Arc; +use std::{cmp::Ordering, ops::Range}; use bytes::Bytes; use url::Url; @@ -111,6 +111,18 @@ pub struct FileMeta { pub size: usize, } +impl Ord for FileMeta { + fn cmp(&self, other: &Self) -> Ordering { + self.location.cmp(&other.location) + } +} + +impl PartialOrd for FileMeta { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + /// Trait for implementing an Expression evaluator. /// /// It contains one Expression which can be evaluated on multiple ColumnarBatches. diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 6c3aa2140..5d956b98a 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -521,6 +521,7 @@ mod tests { let prefix = Path::from(url.path()); let client = ObjectStoreFileSystemClient::new( store, + true, prefix, Arc::new(TokioBackgroundExecutor::new()), ); @@ -624,6 +625,7 @@ mod tests { let client = ObjectStoreFileSystemClient::new( store, + true, Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), ); From fefc6ec53ed0c99df79cd467a6d4f71cef711f99 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Wed, 18 Sep 2024 18:13:31 -0700 Subject: [PATCH 02/11] lint --- kernel/src/engine/default/filesystem.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 3370f7eba..9ac4a25b2 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -84,7 +84,7 @@ impl FileSystemClient for ObjectStoreFileSystemClient { // LocalFileSystem doesn't return things in the order we require let mut fms: Vec = receiver.into_iter().try_collect()?; fms.sort(); - Ok(Box::new(fms.into_iter().map(|fm| Ok(fm)))) + Ok(Box::new(fms.into_iter().map(Ok))) } else { Ok(Box::new(receiver.into_iter())) } From 808ffe8357916e5bd038a64469052e7081aa5b00 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 19 Sep 2024 13:50:54 -0700 Subject: [PATCH 03/11] dbg commit to try and fix windows --- kernel/src/engine/default/filesystem.rs | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 9ac4a25b2..3d4c0a09e 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -52,6 +52,8 @@ impl FileSystemClient for ObjectStoreFileSystemClient { // TODO properly handle table prefix let prefix = self.table_root.child("_delta_log"); + println!("Prefix is {:?}", prefix); + let store = self.inner.clone(); // This channel will become the iterator @@ -234,8 +236,15 @@ mod tests { let url = Url::from_directory_path(tmp.path()).unwrap(); let store = Arc::new(LocalFileSystem::new()); - let prefix = Path::from(url.path()); - let store: Arc = store; + let prefix = Path::from_url_path(url.path()).expect("Couldn't get path"); + + // debug code for windows + let s = store.clone(); + let mut l = s.list(Some(&prefix)); + while let Some(meta) = l.next().await { + println!("DBG GOT {:?}", meta.unwrap()); + } + let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); let client = engine.get_file_system_client(); From 897cf0a8e8ed0cce7bedb802b681256213d3ad7c Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 19 Sep 2024 13:58:05 -0700 Subject: [PATCH 04/11] remove debugging code --- kernel/src/engine/default/filesystem.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 3d4c0a09e..01be4962f 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -52,8 +52,6 @@ impl FileSystemClient for ObjectStoreFileSystemClient { // TODO properly handle table prefix let prefix = self.table_root.child("_delta_log"); - println!("Prefix is {:?}", prefix); - let store = self.inner.clone(); // This channel will become the iterator @@ -237,14 +235,6 @@ mod tests { let url = Url::from_directory_path(tmp.path()).unwrap(); let store = Arc::new(LocalFileSystem::new()); let prefix = Path::from_url_path(url.path()).expect("Couldn't get path"); - - // debug code for windows - let s = store.clone(); - let mut l = s.list(Some(&prefix)); - while let Some(meta) = l.next().await { - println!("DBG GOT {:?}", meta.unwrap()); - } - let engine = DefaultEngine::new(store, prefix, Arc::new(TokioBackgroundExecutor::new())); let client = engine.get_file_system_client(); From 12aad920f3917d7fa338b01c9c42f6188fea0ae6 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 19 Sep 2024 14:17:15 -0700 Subject: [PATCH 05/11] sort_unstable --- kernel/src/engine/default/filesystem.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index 01be4962f..f37b4bd0f 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -83,7 +83,7 @@ impl FileSystemClient for ObjectStoreFileSystemClient { if self.is_local { // LocalFileSystem doesn't return things in the order we require let mut fms: Vec = receiver.into_iter().try_collect()?; - fms.sort(); + fms.sort_unstable(); Ok(Box::new(fms.into_iter().map(Ok))) } else { Ok(Box::new(receiver.into_iter())) From 27e349ac588c369df925572d1452965c0cb9eef6 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 10 Oct 2024 15:11:53 -0700 Subject: [PATCH 06/11] fix for merge --- kernel/src/snapshot.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 5d956b98a..9e621e47f 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -581,6 +581,7 @@ mod tests { let client = ObjectStoreFileSystemClient::new( store, + true, // is_local Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), ); From 2a84547f4dbb35a601cbaf7ef9c2a3cacd0db0ee Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 10 Oct 2024 15:15:44 -0700 Subject: [PATCH 07/11] Just reverse the vec as we're assuming the listing is sorted now --- kernel/src/snapshot.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 9e621e47f..6080aaeab 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -397,8 +397,8 @@ fn list_log_files_with_checkpoint( ))); } - // NOTE this will sort in reverse order - commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version)); + // We assume listing returned ordered, we want reverse order + let commit_files = commit_files.into_iter().rev().collect(); Ok((commit_files, checkpoint_files)) } @@ -441,8 +441,9 @@ fn list_log_files( } commit_files.retain(|f| f.version as i64 > max_checkpoint_version); - // NOTE this will sort in reverse order - commit_files.sort_unstable_by(|a, b| b.version.cmp(&a.version)); + + // We assume listing returned ordered, we want reverse order + let commit_files = commit_files.into_iter().rev().collect(); Ok((commit_files, checkpoint_files)) } From 0a223292fa25fe4de31abde0287763fa562f1b2f Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 10 Oct 2024 15:19:53 -0700 Subject: [PATCH 08/11] debug_assert that list_from returns sorted --- kernel/src/snapshot.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 6080aaeab..835cec8e8 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -397,6 +397,10 @@ fn list_log_files_with_checkpoint( ))); } + debug_assert!( + commit_files.windows(2).all(|cfs| cfs[0].version <= cfs[1].version), + "fs_client.list_from() didn't return a sorted listing! {:?}", commit_files + ); // We assume listing returned ordered, we want reverse order let commit_files = commit_files.into_iter().rev().collect(); @@ -442,6 +446,10 @@ fn list_log_files( commit_files.retain(|f| f.version as i64 > max_checkpoint_version); + debug_assert!( + commit_files.windows(2).all(|cfs| cfs[0].version <= cfs[1].version), + "fs_client.list_from() didn't return a sorted listing! {:?}", commit_files + ); // We assume listing returned ordered, we want reverse order let commit_files = commit_files.into_iter().rev().collect(); From 3b80415e17364c6c10d5fd817a9c27cc75112305 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Thu, 10 Oct 2024 15:21:00 -0700 Subject: [PATCH 09/11] fmt --- kernel/src/snapshot.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 835cec8e8..070129d6b 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -398,8 +398,11 @@ fn list_log_files_with_checkpoint( } debug_assert!( - commit_files.windows(2).all(|cfs| cfs[0].version <= cfs[1].version), - "fs_client.list_from() didn't return a sorted listing! {:?}", commit_files + commit_files + .windows(2) + .all(|cfs| cfs[0].version <= cfs[1].version), + "fs_client.list_from() didn't return a sorted listing! {:?}", + commit_files ); // We assume listing returned ordered, we want reverse order let commit_files = commit_files.into_iter().rev().collect(); @@ -447,8 +450,11 @@ fn list_log_files( commit_files.retain(|f| f.version as i64 > max_checkpoint_version); debug_assert!( - commit_files.windows(2).all(|cfs| cfs[0].version <= cfs[1].version), - "fs_client.list_from() didn't return a sorted listing! {:?}", commit_files + commit_files + .windows(2) + .all(|cfs| cfs[0].version <= cfs[1].version), + "fs_client.list_from() didn't return a sorted listing! {:?}", + commit_files ); // We assume listing returned ordered, we want reverse order let commit_files = commit_files.into_iter().rev().collect(); From 5198a3998ad44120eed53601d737f33aedd0e4d8 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Fri, 11 Oct 2024 13:05:56 -0700 Subject: [PATCH 10/11] add big comment --- kernel/src/engine/default/mod.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 77eae7025..576e09acf 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -55,7 +55,25 @@ impl DefaultEngine { } pub fn new(store: Arc, prefix: Path, task_executor: Arc) -> Self { - // HACK to check if we're using a LocalFileSystem from ObjectStore + // HACK to check if we're using a LocalFileSystem from ObjectStore. We need this because + // local filesystem doesn't return a sorted list by default. Although the `object_store` + // crate explicitly says it _does not_ return a sorted listing, in practice all the cloud + // implementations actually do: + // - AWS: + // [`ListObjectsV2`](https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html) + // states: "For general purpose buckets, ListObjectsV2 returns objects in lexicographical + // order based on their key names." (Directory buckets are out of scope for now) + // - Azure: Docs state + // [here](https://learn.microsoft.com/en-us/rest/api/storageservices/enumerating-blob-resources): + // "A listing operation returns an XML response that contains all or part of the requested + // list. The operation returns entities in alphabetical order." + // - GCP: The [main](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) doc + // doesn't indicate order, but [this + // page](https://cloud.google.com/storage/docs/xml-api/get-bucket-list) does say: "This page + // shows you how to list the [objects](https://cloud.google.com/storage/docs/objects) stored + // in your Cloud Storage buckets, which are ordered in the list lexicographically by name." + // So we just need to know if we're local and then if so, we sort the returned file list in + // `filesystem.rs` let store_str = format!("{}", store); let is_local = store_str.starts_with("LocalFileSystem"); Self { From 2e03a28ba55e0eac5785159dc47988b7e7c13a41 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 15 Oct 2024 09:38:56 -0700 Subject: [PATCH 11/11] switch to has_ordered_listing --- kernel/src/engine/default/filesystem.rs | 12 ++++++------ kernel/src/engine/default/mod.rs | 2 +- kernel/src/snapshot.rs | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kernel/src/engine/default/filesystem.rs b/kernel/src/engine/default/filesystem.rs index f37b4bd0f..ff335ce11 100644 --- a/kernel/src/engine/default/filesystem.rs +++ b/kernel/src/engine/default/filesystem.rs @@ -13,7 +13,7 @@ use crate::{DeltaResult, Error, FileMeta, FileSlice, FileSystemClient}; #[derive(Debug)] pub struct ObjectStoreFileSystemClient { inner: Arc, - is_local: bool, + has_ordered_listing: bool, table_root: Path, task_executor: Arc, readahead: usize, @@ -22,13 +22,13 @@ pub struct ObjectStoreFileSystemClient { impl ObjectStoreFileSystemClient { pub(crate) fn new( store: Arc, - is_local: bool, + has_ordered_listing: bool, table_root: Path, task_executor: Arc, ) -> Self { Self { inner: store, - is_local, + has_ordered_listing, table_root, task_executor, readahead: 10, @@ -80,8 +80,8 @@ impl FileSystemClient for ObjectStoreFileSystemClient { } }); - if self.is_local { - // LocalFileSystem doesn't return things in the order we require + if !self.has_ordered_listing { + // This FS doesn't return things in the order we require let mut fms: Vec = receiver.into_iter().try_collect()?; fms.sort_unstable(); Ok(Box::new(fms.into_iter().map(Ok))) @@ -191,7 +191,7 @@ mod tests { let prefix = Path::from(url.path()); let client = ObjectStoreFileSystemClient::new( store, - true, + false, // don't have ordered listing prefix, Arc::new(TokioBackgroundExecutor::new()), ); diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 576e09acf..f31378b94 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -79,7 +79,7 @@ impl DefaultEngine { Self { file_system: Arc::new(ObjectStoreFileSystemClient::new( store.clone(), - is_local, + !is_local, prefix, task_executor.clone(), )), diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 070129d6b..8f51fcdff 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -536,7 +536,7 @@ mod tests { let prefix = Path::from(url.path()); let client = ObjectStoreFileSystemClient::new( store, - true, + false, // don't have ordered listing prefix, Arc::new(TokioBackgroundExecutor::new()), ); @@ -596,7 +596,7 @@ mod tests { let client = ObjectStoreFileSystemClient::new( store, - true, // is_local + false, // don't have ordered listing Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), ); @@ -641,7 +641,7 @@ mod tests { let client = ObjectStoreFileSystemClient::new( store, - true, + false, // don't have ordered listing Path::from("/"), Arc::new(TokioBackgroundExecutor::new()), );