From 8fe7696a21cb197938dbb47f104b359aeb3fbaf7 Mon Sep 17 00:00:00 2001 From: test Date: Tue, 2 Apr 2024 01:01:40 +0800 Subject: [PATCH 1/5] fix: move object store read/write timer into inner --- src/object-store/src/layers/prometheus.rs | 135 ++++++++++++---------- 1 file changed, 72 insertions(+), 63 deletions(-) diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index 47734634705b..c1055f8326df 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -17,11 +17,12 @@ use std::fmt::{Debug, Formatter}; use std::io; use std::task::{Context, Poll}; +use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; use common_telemetry::debug; -use futures::{FutureExt, TryFutureExt}; +use futures::FutureExt; use lazy_static::lazy_static; use opendal::raw::*; use opendal::ErrorKind; @@ -153,12 +154,7 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Read.into_static()]) .inc(); - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .start_timer(); - - let read_res = self - .inner + self.inner .read(path, args) .map(|v| { v.map(|(rp, r)| { @@ -167,13 +163,12 @@ impl LayeredAccessor for PrometheusAccessor { PrometheusMetricWrapper::new(r, Operation::Read, &self.scheme), ) }) + .map_err(|e| { + increment_errors_total(Operation::Read, e.kind()); + e + }) }) - .await; - timer.observe_duration(); - read_res.map_err(|e| { - increment_errors_total(Operation::Read, e.kind()); - e - }) + .await } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -181,12 +176,7 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Write.into_static()]) .inc(); - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .start_timer(); - - let write_res = self - .inner + self.inner .write(path, args) .map(|v| { v.map(|(rp, r)| { @@ -195,13 +185,12 @@ impl LayeredAccessor for PrometheusAccessor { PrometheusMetricWrapper::new(r, Operation::Write, &self.scheme), ) }) + .map_err(|e| { + increment_errors_total(Operation::Write, e.kind()); + e + }) }) - .await; - timer.observe_duration(); - write_res.map_err(|e| { - increment_errors_total(Operation::Write, e.kind()); - e - }) + .await } async fn stat(&self, path: &str, args: OpStat) -> Result { @@ -212,13 +201,7 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Stat.into_static()]) .start_timer(); - let stat_res = self - .inner - .stat(path, args) - .inspect_err(|e| { - increment_errors_total(Operation::Stat, e.kind()); - }) - .await; + let stat_res = self.inner.stat(path, args).await; timer.observe_duration(); stat_res.map_err(|e| { increment_errors_total(Operation::Stat, e.kind()); @@ -318,20 +301,18 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) .inc(); - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .start_timer(); - let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new(r, Operation::BlockingRead, &self.scheme), - ) - }); - timer.observe_duration(); - result.map_err(|e| { - increment_errors_total(Operation::BlockingRead, e.kind()); - e - }) + self.inner + .blocking_read(path, args) + .map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new(r, Operation::BlockingRead, &self.scheme), + ) + }) + .map_err(|e| { + increment_errors_total(Operation::BlockingRead, e.kind()); + e + }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { @@ -339,20 +320,18 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) .inc(); - let timer = REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .start_timer(); - let result = self.inner.blocking_write(path, args).map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new(r, Operation::BlockingWrite, &self.scheme), - ) - }); - timer.observe_duration(); - result.map_err(|e| { - increment_errors_total(Operation::BlockingWrite, e.kind()); - e - }) + self.inner + .blocking_write(path, args) + .map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new(r, Operation::BlockingWrite, &self.scheme), + ) + }) + .map_err(|e| { + increment_errors_total(Operation::BlockingWrite, e.kind()); + e + }) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { @@ -425,12 +404,17 @@ impl PrometheusMetricWrapper { impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + let start = Instant::now(); + self.inner.poll_read(cx, buf).map(|res| match res { - Ok(bytes) => { + Ok(n) => { BYTES_TOTAL .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(bytes as f64); - Ok(bytes) + .observe(n as f64); + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .observe(start.elapsed().as_secs_f64()); + Ok(n) } Err(e) => { increment_errors_total(self.op, e.kind()); @@ -450,11 +434,16 @@ impl oio::Read for PrometheusMetricWrapper { } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { + let start = Instant::now(); + self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { BYTES_TOTAL .with_label_values(&[&self.scheme, Operation::Read.into_static()]) .observe(bytes.len() as f64); + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .observe(start.elapsed().as_secs_f64()); Some(Ok(bytes)) } Some(Err(e)) => { @@ -468,12 +457,17 @@ impl oio::Read for PrometheusMetricWrapper { impl oio::BlockingRead for PrometheusMetricWrapper { fn read(&mut self, buf: &mut [u8]) -> Result { + let start = Instant::now(); + self.inner .read(buf) .map(|n| { BYTES_TOTAL .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) .observe(n as f64); + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .observe(start.elapsed().as_secs_f64()); n }) .map_err(|e| { @@ -490,11 +484,16 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } fn next(&mut self) -> Option> { + let start = Instant::now(); + self.inner.next().map(|res| match res { Ok(bytes) => { BYTES_TOTAL .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) .observe(bytes.len() as f64); + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .observe(start.elapsed().as_secs_f64()); Ok(bytes) } Err(e) => { @@ -508,12 +507,17 @@ impl oio::BlockingRead for PrometheusMetricWrapper { #[async_trait] impl oio::Write for PrometheusMetricWrapper { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { + let start = Instant::now(); + self.inner .poll_write(cx, bs) .map_ok(|n| { BYTES_TOTAL .with_label_values(&[&self.scheme, Operation::Write.into_static()]) .observe(n as f64); + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .observe(start.elapsed().as_secs_f64()); n }) .map_err(|err| { @@ -539,12 +543,17 @@ impl oio::Write for PrometheusMetricWrapper { impl oio::BlockingWrite for PrometheusMetricWrapper { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { + let start = Instant::now(); + self.inner .write(bs) .map(|n| { BYTES_TOTAL .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) .observe(n as f64); + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .observe(start.elapsed().as_secs_f64()); n }) .map_err(|err| { From 3dca9df099c1d36de59da18a37c2e4ac0322bf74 Mon Sep 17 00:00:00 2001 From: test Date: Tue, 2 Apr 2024 15:04:53 +0800 Subject: [PATCH 2/5] add Drop for PrometheusMetricWrapper --- src/object-store/src/layers/prometheus.rs | 166 +++++++++++++--------- 1 file changed, 100 insertions(+), 66 deletions(-) diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index c1055f8326df..ae612392facd 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -28,7 +28,7 @@ use opendal::raw::*; use opendal::ErrorKind; use prometheus::{ exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, - HistogramVec, IntCounterVec, + Histogram, HistogramVec, IntCounterVec, }; type Result = std::result::Result; @@ -154,13 +154,25 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Read.into_static()]) .inc(); - self.inner - .read(path, args) + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Read.into_static()]) + .start_timer(); + let read_res = self.inner.read(path, args); + timer.observe_duration(); + + read_res .map(|v| { v.map(|(rp, r)| { ( rp, - PrometheusMetricWrapper::new(r, Operation::Read, &self.scheme), + PrometheusMetricWrapper::new( + r, + Operation::Read, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static()]), + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Read.into_static()]), + ), ) }) .map_err(|e| { @@ -176,13 +188,25 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::Write.into_static()]) .inc(); - self.inner - .write(path, args) + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Write.into_static()]) + .start_timer(); + let write_res = self.inner.write(path, args); + timer.observe_duration(); + + write_res .map(|v| { v.map(|(rp, r)| { ( rp, - PrometheusMetricWrapper::new(r, Operation::Write, &self.scheme), + PrometheusMetricWrapper::new( + r, + Operation::Write, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Write.into_static()]), + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Write.into_static()]), + ), ) }) .map_err(|e| { @@ -301,12 +325,28 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) .inc(); - self.inner - .blocking_read(path, args) + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) + .start_timer(); + let result = self.inner.blocking_read(path, args); + timer.observe_duration(); + + result .map(|(rp, r)| { ( rp, - PrometheusMetricWrapper::new(r, Operation::BlockingRead, &self.scheme), + PrometheusMetricWrapper::new( + r, + Operation::BlockingRead, + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::BlockingRead.into_static(), + ]), + REQUESTS_DURATION_SECONDS.with_label_values(&[ + &self.scheme, + Operation::BlockingRead.into_static(), + ]), + ), ) }) .map_err(|e| { @@ -320,12 +360,28 @@ impl LayeredAccessor for PrometheusAccessor { .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) .inc(); - self.inner - .blocking_write(path, args) + let timer = REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) + .start_timer(); + let result = self.inner.blocking_write(path, args); + timer.observe_duration(); + + result .map(|(rp, r)| { ( rp, - PrometheusMetricWrapper::new(r, Operation::BlockingWrite, &self.scheme), + PrometheusMetricWrapper::new( + r, + Operation::BlockingWrite, + BYTES_TOTAL.with_label_values(&[ + &self.scheme, + Operation::BlockingWrite.into_static(), + ]), + REQUESTS_DURATION_SECONDS.with_label_values(&[ + &self.scheme, + Operation::BlockingWrite.into_static(), + ]), + ), ) }) .map_err(|e| { @@ -389,32 +445,45 @@ pub struct PrometheusMetricWrapper { inner: R, op: Operation, - scheme: String, + bytes_counter: Histogram, + requests_duration_seconds: Histogram, + + start: Instant, + bytes: u64, +} + +impl Drop for PrometheusMetricWrapper { + fn drop(&mut self) { + self.bytes_counter.observe(self.bytes as f64); + let dur = self.start.elapsed().as_secs_f64(); + self.requests_duration_seconds.observe(dur); + } } impl PrometheusMetricWrapper { - fn new(inner: R, op: Operation, scheme: &String) -> Self { + fn new( + inner: R, + op: Operation, + bytes_counter: Histogram, + requests_duration_seconds: Histogram, + ) -> Self { Self { inner, op, - scheme: scheme.to_string(), + bytes_counter, + requests_duration_seconds, + start: Instant::now(), + bytes: 0, } } } impl oio::Read for PrometheusMetricWrapper { fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - let start = Instant::now(); - self.inner.poll_read(cx, buf).map(|res| match res { - Ok(n) => { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(n as f64); - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(start.elapsed().as_secs_f64()); - Ok(n) + Ok(bytes) => { + self.bytes += bytes as u64; + Ok(bytes) } Err(e) => { increment_errors_total(self.op, e.kind()); @@ -434,16 +503,9 @@ impl oio::Read for PrometheusMetricWrapper { } fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll>> { - let start = Instant::now(); - self.inner.poll_next(cx).map(|res| match res { Some(Ok(bytes)) => { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(bytes.len() as f64); - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Read.into_static()]) - .observe(start.elapsed().as_secs_f64()); + self.bytes += bytes.len() as u64; Some(Ok(bytes)) } Some(Err(e)) => { @@ -457,17 +519,10 @@ impl oio::Read for PrometheusMetricWrapper { impl oio::BlockingRead for PrometheusMetricWrapper { fn read(&mut self, buf: &mut [u8]) -> Result { - let start = Instant::now(); - self.inner .read(buf) .map(|n| { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(n as f64); - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(start.elapsed().as_secs_f64()); + self.bytes += n as u64; n }) .map_err(|e| { @@ -484,16 +539,9 @@ impl oio::BlockingRead for PrometheusMetricWrapper { } fn next(&mut self) -> Option> { - let start = Instant::now(); - self.inner.next().map(|res| match res { Ok(bytes) => { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(bytes.len() as f64); - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) - .observe(start.elapsed().as_secs_f64()); + self.bytes += bytes.len() as u64; Ok(bytes) } Err(e) => { @@ -507,17 +555,10 @@ impl oio::BlockingRead for PrometheusMetricWrapper { #[async_trait] impl oio::Write for PrometheusMetricWrapper { fn poll_write(&mut self, cx: &mut Context<'_>, bs: &dyn oio::WriteBuf) -> Poll> { - let start = Instant::now(); - self.inner .poll_write(cx, bs) .map_ok(|n| { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(n as f64); - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Write.into_static()]) - .observe(start.elapsed().as_secs_f64()); + self.bytes += n as u64; n }) .map_err(|err| { @@ -543,17 +584,10 @@ impl oio::Write for PrometheusMetricWrapper { impl oio::BlockingWrite for PrometheusMetricWrapper { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { - let start = Instant::now(); - self.inner .write(bs) .map(|n| { - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .observe(n as f64); - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) - .observe(start.elapsed().as_secs_f64()); + self.bytes += n as u64; n }) .map_err(|err| { From 10d03a68d9aacf5c61cd8a2a18595e83a8ab8feb Mon Sep 17 00:00:00 2001 From: test Date: Tue, 2 Apr 2024 15:22:19 +0800 Subject: [PATCH 3/5] call await on async read/write --- src/object-store/src/layers/prometheus.rs | 75 ++++++++++------------- 1 file changed, 34 insertions(+), 41 deletions(-) diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index ae612392facd..7916a8e451aa 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -22,7 +22,6 @@ use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; use common_telemetry::debug; -use futures::FutureExt; use lazy_static::lazy_static; use opendal::raw::*; use opendal::ErrorKind; @@ -157,30 +156,27 @@ impl LayeredAccessor for PrometheusAccessor { let timer = REQUESTS_DURATION_SECONDS .with_label_values(&[&self.scheme, Operation::Read.into_static()]) .start_timer(); - let read_res = self.inner.read(path, args); + let read_res = self.inner.read(path, args).await; timer.observe_duration(); read_res - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static()]), - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Read.into_static()]), - ), - ) - }) - .map_err(|e| { - increment_errors_total(Operation::Read, e.kind()); - e - }) + .map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Read, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static()]), + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Read.into_static()]), + ), + ) + }) + .map_err(|e| { + increment_errors_total(Operation::Read, e.kind()); + e }) - .await } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { @@ -191,30 +187,27 @@ impl LayeredAccessor for PrometheusAccessor { let timer = REQUESTS_DURATION_SECONDS .with_label_values(&[&self.scheme, Operation::Write.into_static()]) .start_timer(); - let write_res = self.inner.write(path, args); + let write_res = self.inner.write(path, args).await; timer.observe_duration(); write_res - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Write, - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Write.into_static()]), - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Write.into_static()]), - ), - ) - }) - .map_err(|e| { - increment_errors_total(Operation::Write, e.kind()); - e - }) + .map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Write, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Write.into_static()]), + REQUESTS_DURATION_SECONDS + .with_label_values(&[&self.scheme, Operation::Write.into_static()]), + ), + ) + }) + .map_err(|e| { + increment_errors_total(Operation::Write, e.kind()); + e }) - .await } async fn stat(&self, path: &str, args: OpStat) -> Result { From c08c999af693987db5fdfcada73f5be9806e7ff0 Mon Sep 17 00:00:00 2001 From: test Date: Wed, 3 Apr 2024 13:35:13 +0800 Subject: [PATCH 4/5] apply review comments --- src/object-store/src/layers/prometheus.rs | 99 +++++++++++------------ 1 file changed, 46 insertions(+), 53 deletions(-) diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index 7916a8e451aa..7bffc5a41bc3 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -17,17 +17,17 @@ use std::fmt::{Debug, Formatter}; use std::io; use std::task::{Context, Poll}; -use std::time::Instant; use async_trait::async_trait; use bytes::Bytes; use common_telemetry::debug; +use futures::FutureExt; use lazy_static::lazy_static; use opendal::raw::*; use opendal::ErrorKind; use prometheus::{ exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter_vec, - Histogram, HistogramVec, IntCounterVec, + Histogram, HistogramTimer, HistogramVec, IntCounterVec, }; type Result = std::result::Result; @@ -156,23 +156,24 @@ impl LayeredAccessor for PrometheusAccessor { let timer = REQUESTS_DURATION_SECONDS .with_label_values(&[&self.scheme, Operation::Read.into_static()]) .start_timer(); - let read_res = self.inner.read(path, args).await; - timer.observe_duration(); - read_res - .map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Read.into_static()]), - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Read.into_static()]), - ), - ) + self.inner + .read(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Read, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Read.into_static()]), + timer, + ), + ) + }) }) + .await .map_err(|e| { increment_errors_total(Operation::Read, e.kind()); e @@ -187,23 +188,24 @@ impl LayeredAccessor for PrometheusAccessor { let timer = REQUESTS_DURATION_SECONDS .with_label_values(&[&self.scheme, Operation::Write.into_static()]) .start_timer(); - let write_res = self.inner.write(path, args).await; - timer.observe_duration(); - write_res - .map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Write, - BYTES_TOTAL - .with_label_values(&[&self.scheme, Operation::Write.into_static()]), - REQUESTS_DURATION_SECONDS - .with_label_values(&[&self.scheme, Operation::Write.into_static()]), - ), - ) + self.inner + .write(path, args) + .map(|v| { + v.map(|(rp, r)| { + ( + rp, + PrometheusMetricWrapper::new( + r, + Operation::Write, + BYTES_TOTAL + .with_label_values(&[&self.scheme, Operation::Write.into_static()]), + timer, + ), + ) + }) }) + .await .map_err(|e| { increment_errors_total(Operation::Write, e.kind()); e @@ -321,10 +323,9 @@ impl LayeredAccessor for PrometheusAccessor { let timer = REQUESTS_DURATION_SECONDS .with_label_values(&[&self.scheme, Operation::BlockingRead.into_static()]) .start_timer(); - let result = self.inner.blocking_read(path, args); - timer.observe_duration(); - result + self.inner + .blocking_read(path, args) .map(|(rp, r)| { ( rp, @@ -335,10 +336,7 @@ impl LayeredAccessor for PrometheusAccessor { &self.scheme, Operation::BlockingRead.into_static(), ]), - REQUESTS_DURATION_SECONDS.with_label_values(&[ - &self.scheme, - Operation::BlockingRead.into_static(), - ]), + timer, ), ) }) @@ -356,10 +354,9 @@ impl LayeredAccessor for PrometheusAccessor { let timer = REQUESTS_DURATION_SECONDS .with_label_values(&[&self.scheme, Operation::BlockingWrite.into_static()]) .start_timer(); - let result = self.inner.blocking_write(path, args); - timer.observe_duration(); - result + self.inner + .blocking_write(path, args) .map(|(rp, r)| { ( rp, @@ -370,10 +367,7 @@ impl LayeredAccessor for PrometheusAccessor { &self.scheme, Operation::BlockingWrite.into_static(), ]), - REQUESTS_DURATION_SECONDS.with_label_values(&[ - &self.scheme, - Operation::BlockingWrite.into_static(), - ]), + timer, ), ) }) @@ -439,17 +433,17 @@ pub struct PrometheusMetricWrapper { op: Operation, bytes_counter: Histogram, - requests_duration_seconds: Histogram, + requests_duration_timer: Option, - start: Instant, bytes: u64, } impl Drop for PrometheusMetricWrapper { fn drop(&mut self) { + if let Some(timer) = self.requests_duration_timer.take() { + timer.observe_duration(); + } self.bytes_counter.observe(self.bytes as f64); - let dur = self.start.elapsed().as_secs_f64(); - self.requests_duration_seconds.observe(dur); } } @@ -458,14 +452,13 @@ impl PrometheusMetricWrapper { inner: R, op: Operation, bytes_counter: Histogram, - requests_duration_seconds: Histogram, + requests_duration_timer: HistogramTimer, ) -> Self { Self { inner, op, bytes_counter, - requests_duration_seconds, - start: Instant::now(), + requests_duration_timer: Some(requests_duration_timer), bytes: 0, } } From 1372e5d18a1483d0a4b14a77cfee7121357acf2c Mon Sep 17 00:00:00 2001 From: test Date: Wed, 3 Apr 2024 14:37:12 +0800 Subject: [PATCH 5/5] git rid of option on timer --- src/object-store/src/layers/prometheus.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index 7bffc5a41bc3..51f8689984f8 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -433,16 +433,12 @@ pub struct PrometheusMetricWrapper { op: Operation, bytes_counter: Histogram, - requests_duration_timer: Option, - + _requests_duration_timer: HistogramTimer, bytes: u64, } impl Drop for PrometheusMetricWrapper { fn drop(&mut self) { - if let Some(timer) = self.requests_duration_timer.take() { - timer.observe_duration(); - } self.bytes_counter.observe(self.bytes as f64); } } @@ -458,7 +454,7 @@ impl PrometheusMetricWrapper { inner, op, bytes_counter, - requests_duration_timer: Some(requests_duration_timer), + _requests_duration_timer: requests_duration_timer, bytes: 0, } }