Skip to content

Commit

Permalink
Fix lru cache (#6)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored May 25, 2024
1 parent fbc3935 commit 91b013e
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
22 changes: 17 additions & 5 deletions src/object-store/src/layers/lru_cache/read_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::{FutureExt, StreamExt};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Read, ReadDyn, Reader};
use opendal::raw::{Access, BytesRange, OpRead, RpRead};
use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead};
use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result};

use crate::metrics::{
Expand Down Expand Up @@ -176,10 +176,13 @@ impl ReadCache {
return inner.read(path, args).await.map(to_output_reader);
}

// FIXME: remove this block after opendal v0.47 released.
let meta = inner.stat(path, OpStat::new()).await?;
let (rp, reader) = inner.read(path, args).await?;
let reader: ReadCacheReader<I> = ReadCacheReader {
path: Arc::new(path.to_string()),
inner_reader: reader,
size: meta.into_metadata().content_length(),
file_cache: self.file_cache.clone(),
mem_cache: self.mem_cache.clone(),
};
Expand All @@ -192,6 +195,14 @@ pub struct ReadCacheReader<I: Access> {
path: Arc<String>,
/// Remote file reader.
inner_reader: I::Reader,
/// FIXME: remove this field after opendal v0.47 released.
///
/// OpenDAL's read_at takes `offset, limit` which means the underlying storage
/// services could return less data than limit. We store size here as a workaround.
///
/// This API has been refactor into `offset, size` instead. After opendal v0.47 released,
/// we don't need this anymore.
size: u64,
/// Local file cache backend
file_cache: Operator,
/// Local memory cache to track local cache files
Expand All @@ -204,7 +215,7 @@ impl<I: Access> ReadCacheReader<I> {
OBJECT_STORE_LRU_CACHE_MISS.inc();

let buf = self.inner_reader.read_at(offset, limit).await?;
let result = self.try_write_cache(buf, offset, limit).await;
let result = self.try_write_cache(buf, offset).await;

match result {
Ok(read_bytes) => {
Expand Down Expand Up @@ -232,17 +243,18 @@ impl<I: Access> ReadCacheReader<I> {
}
}

async fn try_write_cache(&self, buf: Buffer, offset: u64, limit: usize) -> Result<usize> {
async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result<usize> {
let size = buf.len();
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(limit as _)));
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
self.file_cache.write(&read_key, buf).await?;
Ok(size)
}
}

impl<I: Access> Read for ReadCacheReader<I> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(limit as _)));
let size = self.size.min(offset + limit as u64) - offset;
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));

let read_result = self
.mem_cache
Expand Down
4 changes: 2 additions & 2 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
// instead of returning `NotFound` during the reader creation.
// The entry count is 4, because we have the p2 `NotFound` cache.
assert!(store.read_with(p2).range(0..4).await.is_err());
assert_eq!(cache_layer.read_cache_stat().await, (4, 35));
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));

assert_cache_files(
&cache_store,
Expand Down Expand Up @@ -406,7 +406,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert!(store.read(p2).await.is_err());
// Read p1 with range `1..` , the existing p1 with range `0..` must be evicted.
let _ = store.read_with(p1).range(1..15).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (4, 34));
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
assert_cache_files(
&cache_store,
&[
Expand Down

0 comments on commit 91b013e

Please sign in to comment.