diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index 47734634705b..51f8689984f8 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -21,13 +21,13 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use bytes::Bytes; use common_telemetry::debug; -use futures::{FutureExt, TryFutureExt}; +use futures::FutureExt; use lazy_static::lazy_static; use opendal::raw::*; use opendal::ErrorKind; use prometheus::{ exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, - HistogramVec, IntCounterVec, + Histogram, HistogramTimer, HistogramVec, IntCounterVec, }; type Result = std::result::Result; @@ -157,23 +157,27 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Read.into_static()]) .start_timer(); - let read_res = self - .inner + self.inner .read(path, args) .map(|v| { v.map(|(rp, r)| { ( rp, - PrometheusMetricWrapper::new(r, Operation::Read, &self.scheme), + PrometheusMetricWrapper::new( + r, + Operation::Read, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static()]), + timer, + ), ) }) }) - .await; - timer.observe_duration(); - read_res.map_err(|e| { - increment_errors_total(Operation::Read, e.kind()); - e - }) + .await + .map_err(|e| { + increment_errors_total(Operation::Read, e.kind()); + e + }) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -185,23 +189,27 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Write.into_static()]) .start_timer(); - let write_res = self - .inner + self.inner .write(path, args) .map(|v| { v.map(|(rp, r)| { ( rp, - PrometheusMetricWrapper::new(r, Operation::Write, &self.scheme), + PrometheusMetricWrapper::new( + r, + Operation::Write, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Write.into_static()]), + timer, + ), ) }) }) - .await; - timer.observe_duration(); - write_res.map_err(|e| { - increment_errors_total(Operation::Write, e.kind()); - e - }) + .await + .map_err(|e| { + increment_errors_total(Operation::Write, e.kind()); + e + }) } async fn stat(&self, path: &str, args: OpStat) -> Result { @@ -212,13 +220,7 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) .start_timer(); - let stat_res = self - .inner - .stat(path, args) - .inspect_err(|e| { - increment_errors_total(Operation::Stat, e.kind()); - }) - .await; + let stat_res = self.inner.stat(path, args).await; timer.observe_duration(); stat_res.map_err(|e| { increment_errors_total(Operation::Stat, e.kind()); @@ -321,17 +323,27 @@ impl LayeredAccessor for PrometheusAccessor { let timer = REQUESTS_DURATION_SECONDS .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) .start_timer(); - let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new(r, Operation::BlockingRead, &self.scheme), - ) - }); - timer.observe_duration(); - result.map_err(|e| { - increment_errors_total(Operation::BlockingRead, e.kind()); - e - }) + + self.inner + .blocking_read(path, args) + .map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::BlockingRead, + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::BlockingRead.into_static(), + ]), + timer, + ), + ) + }) + .map_err(|e| { + increment_errors_total(Operation::BlockingRead, e.kind()); + e + }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { @@ -342,17 +354,27 @@ impl LayeredAccessor for PrometheusAccessor { let timer = REQUESTS_DURATION_SECONDS .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) .start_timer(); - let result = self.inner.blocking_write(path, args).map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new(r, Operation::BlockingWrite, &self.scheme), - ) - }); - timer.observe_duration(); - result.map_err(|e| { - increment_errors_total(Operation::BlockingWrite, e.kind()); - e - }) + + self.inner + .blocking_write(path, args) + .map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::BlockingWrite, + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::BlockingWrite.into_static(), + ]), + timer, + ), + ) + }) + .map_err(|e| { + increment_errors_total(Operation::BlockingWrite, e.kind()); + e + }) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { @@ -410,15 +432,30 @@ pub struct PrometheusMetricWrapper { inner: R, op: Operation, - scheme: String, + bytes_counter: Histogram, + _requests_duration_timer: HistogramTimer, + bytes: u64, +} + +impl Drop for PrometheusMetricWrapper { + fn drop(&mut self) { + self.bytes_counter.observe(self.bytes as f64); + } } impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, scheme: &String) -> Self { + fn new( + inner: R, + op: Operation, + bytes_counter: Histogram, + requests_duration_timer: HistogramTimer, + ) -> Self { Self { inner, op, - scheme: scheme.to_string(), + bytes_counter, + _requests_duration_timer: requests_duration_timer, + bytes: 0, } } } @@ -427,9 +464,7 @@ impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { self.inner.poll_read(cx, buf).map(|res| match res { Ok(bytes) => { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(bytes as f64); + self.bytes += bytes as u64; Ok(bytes) } Err(e) => { @@ -452,9 +487,7 @@ impl oio::Read for PrometheusMetricWrapper { fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(bytes.len() as f64); + self.bytes += bytes.len() as u64; Some(Ok(bytes)) } Some(Err(e)) => { @@ -471,9 +504,7 @@ impl oio::BlockingRead for PrometheusMetricWrapper { self.inner .read(buf) .map(|n| { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(n as f64); + self.bytes += n as u64; n }) .map_err(|e| { @@ -492,9 +523,7 @@ impl oio::BlockingRead for PrometheusMetricWrapper { fn next(&mut self) -> Option> { self.inner.next().map(|res| match res { Ok(bytes) => { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(bytes.len() as f64); + self.bytes += bytes.len() as u64; Ok(bytes) } Err(e) => { @@ -511,9 +540,7 @@ impl oio::Write for PrometheusMetricWrapper { self.inner .poll_write(cx, bs) .map_ok(|n| { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(n as f64); + self.bytes += n as u64; n }) .map_err(|err| { @@ -542,9 +569,7 @@ impl oio::BlockingWrite for PrometheusMetricWrapper { self.inner .write(bs) .map(|n| { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .observe(n as f64); + self.bytes += n as u64; n }) .map_err(|err| {