Skip to content

Commit

Permalink
build(deps): upgrade opendal to 0.46
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun committed May 24, 2024
1 parent 5df3d4e commit bbb229e
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 28 deletions.
6 changes: 2 additions & 4 deletions src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.45", features = [
opendal = { version = "0.46", features = [
"layers-tracing",
"rustls",
"services-azblob",
"services-fs",
"services-gcs",
"services-http",
"services-oss",
"services-s3",
], default-features = false }
] }
prometheus.workspace = true
uuid.workspace = true

[dev-dependencies]
anyhow = "1.0"
common-telemetry.workspace = true
common-test-util.workspace = true
opendal = { version = "0.45", features = ["services-memory"] }
tokio.workspace = true
19 changes: 8 additions & 11 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use opendal::raw::oio::Read;
use opendal::raw::{
Accessor, Layer, LayeredAccessor, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
RpWrite,
};
use opendal::raw::{Access, Accessor, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite};
use opendal::Result;
mod read_cache;
use common_telemetry::info;
Expand All @@ -32,7 +29,7 @@ pub struct LruCacheLayer<C: Clone> {
read_cache: ReadCache<C>,
}

impl<C: Accessor + Clone> LruCacheLayer<C> {
impl<C: Access + Clone> LruCacheLayer<C> {
/// Create a `[LruCacheLayer]` with local file cache and capacity in bytes.
pub async fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
let read_cache = ReadCache::new(file_cache, capacity);
Expand All @@ -57,27 +54,27 @@ impl<C: Accessor + Clone> LruCacheLayer<C> {
}
}

impl<I: Accessor, C: Accessor + Clone> Layer<I> for LruCacheLayer<C> {
type LayeredAccessor = LruCacheAccessor<I, C>;
impl<I: Access, C: Access + Clone> Layer<I> for LruCacheLayer<C> {
type LayeredAccess = LruCacheAccess<I, C>;

fn layer(&self, inner: I) -> Self::LayeredAccessor {
LruCacheAccessor {
LruCacheAccess {
inner,
read_cache: self.read_cache.clone(),
}
}
}

#[derive(Debug)]
pub struct LruCacheAccessor<I, C: Clone> {
pub struct LruCacheAccess<I, C: Clone> {
inner: I,
read_cache: ReadCache<C>,
}

#[async_trait]
impl<I: Accessor, C: Accessor + Clone> LayeredAccessor for LruCacheAccessor<I, C> {
impl<I: Access, C: Access + Clone> LayeredAccess for LruCacheAccess<I, C> {
type Inner = I;
type Reader = Box<dyn Read>;
type Reader = Arc<dyn Read>;
type BlockingReader = I::BlockingReader;
type Writer = I::Writer;
type BlockingWriter = I::BlockingWriter;
Expand Down
16 changes: 8 additions & 8 deletions src/object-store/src/layers/lru_cache/read_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use common_telemetry::debug;
use futures::FutureExt;
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{ListExt, Read, ReadExt, Reader, WriteExt};
use opendal::raw::{Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
use opendal::raw::oio::{List, Read, Reader, Write};
use opendal::raw::{Access, Accessor, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, Result};

use crate::metrics::{
Expand Down Expand Up @@ -69,7 +69,7 @@ pub(crate) struct ReadCache<C: Clone> {
mem_cache: Cache<String, ReadResult>,
}

impl<C: Accessor + Clone> ReadCache<C> {
impl<C: Access + Clone> ReadCache<C> {
/// Create a [`ReadCache`] with capacity in bytes.
pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
let file_cache_cloned = file_cache.clone();
Expand Down Expand Up @@ -173,9 +173,9 @@ impl<C: Accessor + Clone> ReadCache<C> {
inner: &I,
path: &str,
args: OpRead,
) -> Result<(RpRead, Box<dyn Read>)>
) -> Result<(RpRead, Arc<dyn Read>)>
where
I: Accessor,
I: Access,
{
if !can_cache(path) {
return inner.read(path, args).await.map(to_output_reader);
Expand Down Expand Up @@ -224,7 +224,7 @@ impl<C: Accessor + Clone> ReadCache<C> {

async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
where
I: Accessor,
I: Access,
{
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
let mut total = 0;
Expand All @@ -247,7 +247,7 @@ impl<C: Accessor + Clone> ReadCache<C> {
args: OpRead,
) -> Result<ReadResult>
where
I: Accessor,
I: Access,
{
OBJECT_STORE_LRU_CACHE_MISS.inc();

Expand Down Expand Up @@ -282,7 +282,7 @@ impl<C: Accessor + Clone> ReadCache<C> {
}

fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
(input.0, Arc::new(input.1))
}

#[cfg(test)]
Expand Down
10 changes: 5 additions & 5 deletions src/object-store/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) {
#[derive(Default, Debug, Clone)]
pub struct PrometheusMetricsLayer;

impl<A: Accessor> Layer<A> for PrometheusMetricsLayer {
type LayeredAccessor = PrometheusAccessor<A>;
impl<A: Access> Layer<A> for PrometheusMetricsLayer {
type LayeredAccess = PrometheusAccessor<A>;

fn layer(&self, inner: A) -> Self::LayeredAccessor {
let meta = inner.info();
Expand All @@ -104,12 +104,12 @@ impl<A: Accessor> Layer<A> for PrometheusMetricsLayer {
}

#[derive(Clone)]
pub struct PrometheusAccessor<A: Accessor> {
pub struct PrometheusAccessor<A: Access> {
inner: A,
scheme: String,
}

impl<A: Accessor> Debug for PrometheusAccessor<A> {
impl<A: Access> Debug for PrometheusAccessor<A> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrometheusAccessor")
.field("inner", &self.inner)
Expand All @@ -118,7 +118,7 @@ impl<A: Accessor> Debug for PrometheusAccessor<A> {
}

#[async_trait]
impl<A: Accessor> LayeredAccessor for PrometheusAccessor<A> {
impl<A: Access> LayeredAccess for PrometheusAccessor<A> {
type Inner = A;
type Reader = PrometheusMetricWrapper<A::Reader>;
type BlockingReader = PrometheusMetricWrapper<A::BlockingReader>;
Expand Down

0 comments on commit bbb229e

Please sign in to comment.