Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lru cache #6

Merged
merged 1 commit into from
May 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions src/object-store/src/layers/lru_cache/read_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::{FutureExt, StreamExt};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Read, ReadDyn, Reader};
use opendal::raw::{Access, BytesRange, OpRead, RpRead};
use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead};
use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result};

use crate::metrics::{
Expand Down Expand Up @@ -176,10 +176,13 @@ impl ReadCache {
return inner.read(path, args).await.map(to_output_reader);
}

// FIXME: remove this block after opendal v0.47 released.
let meta = inner.stat(path, OpStat::new()).await?;
let (rp, reader) = inner.read(path, args).await?;
let reader: ReadCacheReader<I> = ReadCacheReader {
path: Arc::new(path.to_string()),
inner_reader: reader,
size: meta.into_metadata().content_length(),
file_cache: self.file_cache.clone(),
mem_cache: self.mem_cache.clone(),
};
Expand All @@ -192,6 +195,14 @@ pub struct ReadCacheReader<I: Access> {
path: Arc<String>,
/// Remote file reader.
inner_reader: I::Reader,
/// FIXME: remove this field after opendal v0.47 released.
///
/// OpenDAL's read_at takes `offset, limit` which means the underlying storage
/// services could return less data than limit. We store size here as a workaround.
///
/// This API has been refactor into `offset, size` instead. After opendal v0.47 released,
/// we don't need this anymore.
size: u64,
/// Local file cache backend
file_cache: Operator,
/// Local memory cache to track local cache files
Expand All @@ -204,7 +215,7 @@ impl<I: Access> ReadCacheReader<I> {
OBJECT_STORE_LRU_CACHE_MISS.inc();

let buf = self.inner_reader.read_at(offset, limit).await?;
let result = self.try_write_cache(buf, offset, limit).await;
let result = self.try_write_cache(buf, offset).await;

match result {
Ok(read_bytes) => {
Expand Down Expand Up @@ -232,17 +243,18 @@ impl<I: Access> ReadCacheReader<I> {
}
}

async fn try_write_cache(&self, buf: Buffer, offset: u64, limit: usize) -> Result<usize> {
async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result<usize> {
let size = buf.len();
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(limit as _)));
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
self.file_cache.write(&read_key, buf).await?;
Ok(size)
}
}

impl<I: Access> Read for ReadCacheReader<I> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(limit as _)));
let size = self.size.min(offset + limit as u64) - offset;
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));

let read_result = self
.mem_cache
Expand Down
4 changes: 2 additions & 2 deletions src/object-store/tests/object_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
// instead of returning `NotFound` during the reader creation.
// The entry count is 4, because we have the p2 `NotFound` cache.
assert!(store.read_with(p2).range(0..4).await.is_err());
assert_eq!(cache_layer.read_cache_stat().await, (4, 35));
assert_eq!(cache_layer.read_cache_stat().await, (3, 35));

assert_cache_files(
&cache_store,
Expand Down Expand Up @@ -406,7 +406,7 @@ async fn test_object_store_cache_policy() -> Result<()> {
assert!(store.read(p2).await.is_err());
// Read p1 with range `1..` , the existing p1 with range `0..` must be evicted.
let _ = store.read_with(p1).range(1..15).await.unwrap();
assert_eq!(cache_layer.read_cache_stat().await, (4, 34));
assert_eq!(cache_layer.read_cache_stat().await, (3, 34));
assert_cache_files(
&cache_store,
&[
Expand Down
Loading