diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 8b64511598a8..d5bd5d273e91 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -20,6 +20,7 @@ mod gcs; mod oss; mod s3; +use std::sync::Arc; use std::time::Duration; use std::{env, path}; @@ -28,7 +29,7 @@ use common_telemetry::info; use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{HttpClient, ObjectStore}; +use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -108,11 +109,9 @@ async fn create_object_store_with_cache( clean_temp_dir(&atomic_temp_dir)?; let mut builder = Fs::default(); builder.root(path).atomic_write_dir(&atomic_temp_dir); - let cache_store = ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish(); + let cache_store = builder.build().context(error::InitBackendSnafu)?; - let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize) + let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) .await .context(error::InitBackendSnafu)?; diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index 2e0d2659ddb2..4e41ee2b7620 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -12,26 +12,28 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use opendal::raw::oio::ReadDyn; use opendal::raw::{ Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, }; -use opendal::{Operator, Result}; +use opendal::Result; mod read_cache; use common_telemetry::info; use read_cache::ReadCache; /// An opendal layer with local LRU file cache supporting. #[derive(Clone)] -pub struct LruCacheLayer { +pub struct LruCacheLayer { // The read cache - read_cache: ReadCache, + read_cache: ReadCache, } -impl LruCacheLayer { +impl LruCacheLayer { /// Create a `[LruCacheLayer]` with local file cache and capacity in bytes. - pub async fn new(file_cache: Operator, capacity: usize) -> Result { + pub async fn new(file_cache: Arc, capacity: usize) -> Result { let read_cache = ReadCache::new(file_cache, capacity); let (entries, bytes) = read_cache.recover_cache().await?; @@ -50,12 +52,12 @@ impl LruCacheLayer { /// Returns the read cache statistics info `(EntryCount, SizeInBytes)`. pub async fn read_cache_stat(&self) -> (u64, u64) { - self.read_cache.stat().await + self.read_cache.cache_stat().await } } -impl Layer for LruCacheLayer { - type LayeredAccess = LruCacheAccess; +impl Layer for LruCacheLayer { + type LayeredAccess = LruCacheAccess; fn layer(&self, inner: I) -> Self::LayeredAccess { LruCacheAccess { @@ -66,12 +68,12 @@ impl Layer for LruCacheLayer { } #[derive(Debug)] -pub struct LruCacheAccess { +pub struct LruCacheAccess { inner: I, - read_cache: ReadCache, + read_cache: ReadCache, } -impl LayeredAccess for LruCacheAccess { +impl LayeredAccess for LruCacheAccess { type Inner = I; type Reader = Box; type BlockingReader = I::BlockingReader; @@ -85,7 +87,9 @@ impl LayeredAccess for LruCacheAccess { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.read_cache.read(&self.inner, path, args).await + self.read_cache + .read_from_cache(&self.inner, path, args) + .await } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index bb60c64b47e4..00949cc552ac 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use common_telemetry::debug; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use moka::future::Cache; use moka::notification::ListenerFuture; -use opendal::raw::oio::{Read, Reader}; -use opendal::raw::{Access, OpRead, RpRead}; -use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result}; +use opendal::raw::oio::{List, Read, ReadDyn, Reader, Write}; +use opendal::raw::{Access, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead}; +use opendal::{Error as OpendalError, ErrorKind, Result}; use crate::metrics::{ OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, @@ -51,23 +51,36 @@ fn can_cache(path: &str) -> bool { !path.ends_with("_last_checkpoint") } -/// Generate a unique cache key for the read path and range. -fn read_cache_key(path: &str) -> String { - format!("{:x}.cache", md5::compute(path)) +/// Generate an unique cache key for the read path and range. +fn read_cache_key(path: &str, args: &OpRead) -> String { + format!( + "{:x}.cache-{}", + md5::compute(path), + args.range().to_header() + ) } /// Local read cache for files in object storage -#[derive(Clone, Debug)] -pub(crate) struct ReadCache { +#[derive(Debug)] +pub(crate) struct ReadCache { /// Local file cache backend - file_cache: Operator, + file_cache: Arc, /// Local memory cache to track local cache files mem_cache: Cache, } -impl ReadCache { +impl Clone for ReadCache { + fn clone(&self) -> Self { + Self { + file_cache: self.file_cache.clone(), + mem_cache: self.mem_cache.clone(), + } + } +} + +impl ReadCache { /// Create a [`ReadCache`] with capacity in bytes. - pub(crate) fn new(file_cache: Operator, capacity: usize) -> Self { + pub(crate) fn new(file_cache: Arc, capacity: usize) -> Self { let file_cache_cloned = file_cache.clone(); let eviction_listener = move |read_key: Arc, read_result: ReadResult, cause| -> ListenerFuture { @@ -79,7 +92,7 @@ impl ReadCache { if let ReadResult::Success(size) = read_result { OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64); - let result = file_cache_cloned.delete(&read_key).await; + let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await; debug!( "Deleted local cache file `{}`, result: {:?}, cause: {:?}.", read_key, result, cause @@ -104,7 +117,7 @@ impl ReadCache { } /// Returns the cache's entry count and total approximate entry size in bytes. - pub(crate) async fn stat(&self) -> (u64, u64) { + pub(crate) async fn cache_stat(&self) -> (u64, u64) { self.mem_cache.run_pending_tasks().await; (self.mem_cache.entry_count(), self.mem_cache.weighted_size()) @@ -129,17 +142,17 @@ impl ReadCache { /// Recover existing cache items from `file_cache` to `mem_cache`. /// Return entry count and total approximate entry size in bytes. pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { - let mut pager = self.file_cache.lister("/").await?; + let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?; - while let Some(entry) = pager.next().await.transpose()? { + while let Some(entry) = pager.next().await? { let read_key = entry.path(); // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, // because it's private field. let size = { - let stat = self.file_cache.stat(read_key).await?; + let stat = self.file_cache.stat(read_key, OpStat::default()).await?; - stat.content_length() + stat.into_metadata().content_length() }; OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); @@ -149,26 +162,27 @@ impl ReadCache { .await; } - Ok(self.stat().await) + Ok(self.cache_stat().await) } /// Returns true when the read cache contains the specific file. pub(crate) async fn contains_file(&self, path: &str) -> bool { self.mem_cache.run_pending_tasks().await; - self.mem_cache.contains_key(path) && self.file_cache.stat(path).await.is_ok() + self.mem_cache.contains_key(path) + && self.file_cache.stat(path, OpStat::default()).await.is_ok() } /// Read from a specific path using the OpRead operation. /// It will attempt to retrieve the data from the local cache. /// If the data is not found in the local cache, - /// it will fall back to retrieving it from remote object storage + /// it will fallback to retrieving it from remote object storage /// and cache the result locally. - pub(crate) async fn read( + pub(crate) async fn read_from_cache( &self, inner: &I, path: &str, args: OpRead, - ) -> Result<(RpRead, Reader)> + ) -> Result<(RpRead, Box)> where I: Access, { @@ -176,35 +190,82 @@ impl ReadCache { return inner.read(path, args).await.map(to_output_reader); } - let (rp, reader) = inner.read(path, args).await?; - let reader: ReadCacheReader = ReadCacheReader { - path: Arc::new(path.to_string()), - inner_reader: reader, - file_cache: self.file_cache.clone(), - mem_cache: self.mem_cache.clone(), - }; - Ok((rp, Box::new(reader))) + let read_key = read_cache_key(path, &args); + + let read_result = self + .mem_cache + .try_get_with( + read_key.clone(), + self.read_remote(inner, &read_key, path, args.clone()), + ) + .await + .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; + + match read_result { + ReadResult::Success(_) => { + // There is a concurrent issue here, the local cache may be purged + // while reading, we have to fallback to remote read + match self.file_cache.read(&read_key, OpRead::default()).await { + Ok(ret) => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["success"]) + .inc(); + Ok(to_output_reader(ret)) + } + Err(_) => { + OBJECT_STORE_LRU_CACHE_MISS.inc(); + inner.read(path, args).await.map(to_output_reader) + } + } + } + ReadResult::NotFound => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["not_found"]) + .inc(); + + Err(OpendalError::new( + ErrorKind::NotFound, + &format!("File not found: {path}"), + )) + } + } } -} -pub struct ReadCacheReader { - /// Path of the file - path: Arc, - /// Remote file reader. - inner_reader: I::Reader, - /// Local file cache backend - file_cache: Operator, - /// Local memory cache to track local cache files - mem_cache: Cache, -} + async fn try_write_cache(&self, mut reader: I::Reader, read_key: &str) -> Result + where + I: Access, + { + let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?; + let mut total = 0; + loop { + let bytes = reader.read().await?; + if bytes.is_empty() { + break; + } + + total += bytes.len(); + writer.write(bytes).await?; + } + // Call `close` to ensure data is written. + writer.close().await?; + Ok(total) + } -impl ReadCacheReader { - /// TODO: we can return the Buffer directly to avoid another read from cache. - async fn read_remote(&mut self) -> Result { + /// Read the file from remote storage. If success, write the content into local cache. + async fn read_remote( + &self, + inner: &I, + read_key: &str, + path: &str, + args: OpRead, + ) -> Result + where + I: Access, + { OBJECT_STORE_LRU_CACHE_MISS.inc(); - let buf = self.inner_reader.read_all().await?; - let result = self.try_write_cache(buf).await; + let (_, reader) = inner.read(path, args).await?; + let result = self.try_write_cache::(reader, read_key).await; match result { Ok(read_bytes) => { @@ -231,53 +292,6 @@ impl ReadCacheReader { } } } - - async fn try_write_cache(&self, buf: Buffer) -> Result { - let size = buf.len(); - let read_key = read_cache_key(&self.path); - self.file_cache.write(&read_key, buf).await?; - Ok(size) - } -} - -impl Read for ReadCacheReader { - async fn read(&mut self) -> Result { - let read_key = read_cache_key(&self.path); - let mem_cache = self.mem_cache.clone(); - let read_result = mem_cache - .try_get_with(read_key.clone(), self.read_remote()) - .await - .map_err(|e| OpendalError::new(e.kind(), e.to_string()))?; - - match read_result { - ReadResult::Success(_) => { - // There is a concurrent issue here, the local cache may be purged - // while reading, we have to fall back to remote read - match self.file_cache.read(&read_key).await { - Ok(ret) => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["success"]) - .inc(); - Ok(ret) - } - Err(_) => { - OBJECT_STORE_LRU_CACHE_MISS.inc(); - self.inner_reader.read().await - } - } - } - ReadResult::NotFound => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["not_found"]) - .inc(); - - Err(OpendalError::new( - ErrorKind::NotFound, - format!("File not found: {}", self.path), - )) - } - } - } } fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index a3d3800054c7..b5cedf6e651a 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -22,8 +22,10 @@ use object_store::layers::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{ObjectStore, ObjectStoreBuilder}; +use opendal::raw::oio::{List, Read}; +use opendal::raw::{Access, OpList, OpRead}; use opendal::services::{Azblob, Gcs, Oss}; -use opendal::{EntryMode, Operator, OperatorBuilder}; +use opendal::{EntryMode, OperatorBuilder}; async fn test_object_crud(store: &ObjectStore) -> Result<()> { // Create object handler. @@ -227,7 +229,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { .root(&data_dir.path().to_string_lossy()) .atomic_write_dir(&tmp_dir.path().to_string_lossy()); - let store = ObjectStore::new(builder).unwrap().finish(); + let store = builder.build().unwrap(); let cache_dir = create_temp_dir("test_file_backend_with_lru_cache"); let cache_layer = { @@ -235,12 +237,14 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { let _ = builder .root(&cache_dir.path().to_string_lossy()) .atomic_write_dir(&cache_dir.path().to_string_lossy()); - let file_cache = Operator::new(builder).unwrap().finish(); + let file_cache = Arc::new(builder.build().unwrap()); LruCacheLayer::new(file_cache, 32).await.unwrap() }; - let store = store.layer(cache_layer.clone()); + let store = OperatorBuilder::new(store) + .layer(cache_layer.clone()) + .finish(); test_object_crud(&store).await?; test_object_list(&store).await?; @@ -250,31 +254,36 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { Ok(()) } -async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { +async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { for file_name in file_names { assert!(cache_layer.contains_file(file_name).await); } } -async fn assert_cache_files( - store: &Operator, +async fn assert_cache_files( + store: &C, file_names: &[&str], file_contents: &[&str], ) -> Result<()> { - let objects = store.list("/").await?; + let (_, mut lister) = store.list("/", OpList::default()).await?; + let mut objects = vec![]; + while let Some(e) = lister.next().await? { + objects.push(e); + } // compare the cache file with the expected cache file; ignore orders for o in objects { - let position = file_names.iter().position(|&x| x == o.name()); - assert!(position.is_some(), "file not found: {}", o.name()); + let position = file_names.iter().position(|&x| x == o.path()); + assert!(position.is_some(), "file not found: {}", o.path()); let position = position.unwrap(); - let bs = store.read(o.path()).await.unwrap(); + let (_, mut r) = store.read(o.path(), OpRead::default()).await.unwrap(); + let bs = r.read_all().await.unwrap(); assert_eq!( file_contents[position], String::from_utf8(bs.to_vec())?, "file content not match: {}", - o.name() + o.path() ); } @@ -303,7 +312,7 @@ async fn test_object_store_cache_policy() -> Result<()> { .root(&cache_dir.path().to_string_lossy()) .atomic_write_dir(&atomic_temp_dir.path().to_string_lossy()); let file_cache = Arc::new(builder.build().unwrap()); - let cache_store = OperatorBuilder::new(file_cache.clone()).finish(); + let cache_store = file_cache.clone(); // create operator for cache dir to verify cache file let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap();