diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 6831ccf6a1b2..81415b8039ca 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -19,7 +19,7 @@ use futures::{FutureExt, StreamExt}; use moka::future::Cache; use moka::notification::ListenerFuture; use opendal::raw::oio::{Read, ReadDyn, Reader}; -use opendal::raw::{Access, BytesRange, OpRead, RpRead}; +use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead}; use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result}; use crate::metrics::{ @@ -176,10 +176,13 @@ impl ReadCache { return inner.read(path, args).await.map(to_output_reader); } + // FIXME: remove this block after opendal v0.47 released. + let meta = inner.stat(path, OpStat::new()).await?; let (rp, reader) = inner.read(path, args).await?; let reader: ReadCacheReader = ReadCacheReader { path: Arc::new(path.to_string()), inner_reader: reader, + size: meta.into_metadata().content_length(), file_cache: self.file_cache.clone(), mem_cache: self.mem_cache.clone(), }; @@ -192,6 +195,14 @@ pub struct ReadCacheReader { path: Arc, /// Remote file reader. inner_reader: I::Reader, + /// FIXME: remove this field after opendal v0.47 released. + /// + /// OpenDAL's read_at takes `offset, limit` which means the underlying storage + /// services could return less data than limit. We store size here as a workaround. + /// + /// This API has been refactor into `offset, size` instead. After opendal v0.47 released, + /// we don't need this anymore. + size: u64, /// Local file cache backend file_cache: Operator, /// Local memory cache to track local cache files @@ -204,7 +215,7 @@ impl ReadCacheReader { OBJECT_STORE_LRU_CACHE_MISS.inc(); let buf = self.inner_reader.read_at(offset, limit).await?; - let result = self.try_write_cache(buf, offset, limit).await; + let result = self.try_write_cache(buf, offset).await; match result { Ok(read_bytes) => { @@ -232,9 +243,9 @@ impl ReadCacheReader { } } - async fn try_write_cache(&self, buf: Buffer, offset: u64, limit: usize) -> Result { + async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result { let size = buf.len(); - let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(limit as _))); + let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _))); self.file_cache.write(&read_key, buf).await?; Ok(size) } @@ -242,7 +253,8 @@ impl ReadCacheReader { impl Read for ReadCacheReader { async fn read_at(&self, offset: u64, limit: usize) -> Result { - let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(limit as _))); + let size = self.size.min(offset + limit as u64) - offset; + let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _))); let read_result = self .mem_cache diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index c3ed4931b43b..a3d3800054c7 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -378,7 +378,7 @@ async fn test_object_store_cache_policy() -> Result<()> { // instead of returning `NotFound` during the reader creation. // The entry count is 4, because we have the p2 `NotFound` cache. assert!(store.read_with(p2).range(0..4).await.is_err()); - assert_eq!(cache_layer.read_cache_stat().await, (4, 35)); + assert_eq!(cache_layer.read_cache_stat().await, (3, 35)); assert_cache_files( &cache_store, @@ -406,7 +406,7 @@ async fn test_object_store_cache_policy() -> Result<()> { assert!(store.read(p2).await.is_err()); // Read p1 with range `1..` , the existing p1 with range `0..` must be evicted. let _ = store.read_with(p1).range(1..15).await.unwrap(); - assert_eq!(cache_layer.read_cache_stat().await, (4, 34)); + assert_eq!(cache_layer.read_cache_stat().await, (3, 34)); assert_cache_files( &cache_store, &[