From 24be36fb429a0293277c49b4fbebba1927e52a5c Mon Sep 17 00:00:00 2001 From: Satyam Singh Date: Wed, 4 Oct 2023 18:38:35 +0530 Subject: [PATCH] Add Prometheus metrics for Datafusion calls to S3 store (#527) --- server/src/metrics/storage.rs | 13 ++ server/src/storage.rs | 1 + server/src/storage/metrics_layer.rs | 267 ++++++++++++++++++++++++++++ server/src/storage/s3.rs | 2 + 4 files changed, 283 insertions(+) create mode 100644 server/src/storage/metrics_layer.rs diff --git a/server/src/metrics/storage.rs b/server/src/metrics/storage.rs index 6d122ba39..7a35d8b46 100644 --- a/server/src/metrics/storage.rs +++ b/server/src/metrics/storage.rs @@ -64,12 +64,25 @@ pub mod s3 { .expect("metric can be created") }); + pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { + HistogramVec::new( + HistogramOpts::new("query_s3_response_time", "S3 Request Latency") + .namespace(METRICS_NAMESPACE), + &["method", "status"], + ) + .expect("metric can be created") + }); + impl StorageMetrics for S3Config { fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) { handler .registry .register(Box::new(REQUEST_RESPONSE_TIME.clone())) .expect("metric can be registered"); + handler + .registry + .register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone())) + .expect("metric can be registered"); } } } diff --git a/server/src/storage.rs b/server/src/storage.rs index 561e3722a..3873b45ba 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -23,6 +23,7 @@ use chrono::Local; use std::fmt::Debug; mod localfs; +mod metrics_layer; mod object_storage; pub mod retention; mod s3; diff --git a/server/src/storage/metrics_layer.rs b/server/src/storage/metrics_layer.rs new file mode 100644 index 000000000..d87b333a2 --- /dev/null +++ b/server/src/storage/metrics_layer.rs @@ -0,0 +1,267 @@ +use std::{ + ops::Range, + task::{Context, Poll}, + time, +}; + +use async_trait::async_trait; +use bytes::Bytes; +use futures_util::{stream::BoxStream, Stream, StreamExt}; +use object_store::{ + path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, +}; +use tokio::io::AsyncWrite; + +use crate::metrics::storage::s3::QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME; + +#[derive(Debug)] +pub struct MetricLayer { + inner: T, +} + +impl MetricLayer { + pub fn new(inner: T) -> Self { + Self { inner } + } +} + +impl std::fmt::Display for MetricLayer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Metric({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for MetricLayer { + async fn put(&self, location: &Path, bytes: Bytes) -> object_store::Result<()> { + let time = time::Instant::now(); + self.inner.put(location, bytes).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["PUT", "200"]) + .observe(elapsed); + return Ok(()); + } + + // todo completly tracking multipart upload + async fn put_multipart( + &self, + location: &Path, + ) -> object_store::Result<(MultipartId, Box)> { + let time = time::Instant::now(); + let (id, write) = self.inner.put_multipart(location).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["PUT_MULTIPART", "200"]) + .observe(elapsed); + + Ok((id, write)) + } + + async fn abort_multipart( + &self, + location: &Path, + multipart_id: &MultipartId, + ) -> object_store::Result<()> { + let time = time::Instant::now(); + let elapsed = time.elapsed().as_secs_f64(); + self.inner.abort_multipart(location, multipart_id).await?; + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["PUT_MULTIPART_ABORT", "200"]) + .observe(elapsed); + Ok(()) + } + + async fn append( + &self, + location: &Path, + ) -> object_store::Result> { + let time = time::Instant::now(); + let write = self.inner.append(location).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["APPEND", "200"]) + .observe(elapsed); + + Ok(write) + } + + async fn get(&self, location: &Path) -> object_store::Result { + let time = time::Instant::now(); + let res = self.inner.get(location).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["GET", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn get_opts( + &self, + location: &Path, + options: GetOptions, + ) -> object_store::Result { + let time = time::Instant::now(); + let res = self.inner.get_opts(location, options).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["GET_OPTS", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn get_range(&self, location: &Path, range: Range) -> object_store::Result { + let time = time::Instant::now(); + let res = self.inner.get_range(location, range).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["GET_RANGE", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn get_ranges( + &self, + location: &Path, + ranges: &[Range], + ) -> object_store::Result> { + let time = time::Instant::now(); + let res = self.inner.get_ranges(location, ranges).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["GET_RANGES", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn head(&self, location: &Path) -> object_store::Result { + let time = time::Instant::now(); + let res = self.inner.head(location).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["HEAD", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn delete(&self, location: &Path) -> object_store::Result<()> { + let time = time::Instant::now(); + let res = self.inner.delete(location).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["DELETE", "200"]) + .observe(elapsed); + Ok(res) + } + + fn delete_stream<'a>( + &'a self, + locations: BoxStream<'a, object_store::Result>, + ) -> BoxStream<'a, object_store::Result> { + self.inner.delete_stream(locations) + } + + async fn list( + &self, + prefix: Option<&Path>, + ) -> object_store::Result>> { + let time = time::Instant::now(); + let inner = self.inner.list(prefix).await?; + let res = StreamMetricWrapper { + time, + labels: ["LIST", "200"], + inner, + }; + Ok(Box::pin(res)) + } + + async fn list_with_offset( + &self, + prefix: Option<&Path>, + offset: &Path, + ) -> object_store::Result>> { + let time = time::Instant::now(); + let inner = self.inner.list_with_offset(prefix, offset).await?; + let res = StreamMetricWrapper { + time, + labels: ["LIST_OFFSET", "200"], + inner, + }; + Ok(Box::pin(res)) + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result { + let time = time::Instant::now(); + let res = self.inner.list_with_delimiter(prefix).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["LIST_DELIM", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> { + let time = time::Instant::now(); + let res = self.inner.copy(from, to).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["COPY", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> { + let time = time::Instant::now(); + let res = self.inner.rename(from, to).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["RENAME", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + let time = time::Instant::now(); + let res = self.inner.copy_if_not_exists(from, to).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["COPY_IF", "200"]) + .observe(elapsed); + Ok(res) + } + + async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> { + let time = time::Instant::now(); + let res = self.inner.rename_if_not_exists(from, to).await?; + let elapsed = time.elapsed().as_secs_f64(); + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&["RENAME_IF", "200"]) + .observe(elapsed); + Ok(res) + } +} + +struct StreamMetricWrapper<'a, const N: usize, T> { + time: time::Instant, + labels: [&'static str; N], + inner: BoxStream<'a, T>, +} + +impl Stream for StreamMetricWrapper<'_, N, T> { + type Item = T; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match self.inner.poll_next_unpin(cx) { + t @ Poll::Ready(None) => { + QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME + .with_label_values(&self.labels) + .observe(self.time.elapsed().as_secs_f64()); + t + } + t => t, + } + } +} diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 72a1404aa..68379c55e 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -41,6 +41,7 @@ use std::time::{Duration, Instant}; use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics}; use crate::storage::{LogStream, ObjectStorage, ObjectStorageError}; +use super::metrics_layer::MetricLayer; use super::{object_storage, ObjectStorageProvider}; // in bytes @@ -178,6 +179,7 @@ impl ObjectStorageProvider for S3Config { // limit objectstore to a concurrent request limit let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); + let s3 = MetricLayer::new(s3); let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new(); let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap();