Skip to content

Commit

Permalink
Add Prometheus metrics for Datafusion calls to S3 store (#527)
Browse files Browse the repository at this point in the history
  • Loading branch information
trueleo authored Oct 4, 2023
1 parent 3523480 commit 24be36f
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 0 deletions.
13 changes: 13 additions & 0 deletions server/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,25 @@ pub mod s3 {
.expect("metric can be created")
});

pub static QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME: Lazy<HistogramVec> = Lazy::new(|| {
HistogramVec::new(
HistogramOpts::new("query_s3_response_time", "S3 Request Latency")
.namespace(METRICS_NAMESPACE),
&["method", "status"],
)
.expect("metric can be created")
});

impl StorageMetrics for S3Config {
fn register_metrics(&self, handler: &actix_web_prometheus::PrometheusMetrics) {
handler
.registry
.register(Box::new(REQUEST_RESPONSE_TIME.clone()))
.expect("metric can be registered");
handler
.registry
.register(Box::new(QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME.clone()))
.expect("metric can be registered");
}
}
}
1 change: 1 addition & 0 deletions server/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use chrono::Local;
use std::fmt::Debug;

mod localfs;
mod metrics_layer;
mod object_storage;
pub mod retention;
mod s3;
Expand Down
267 changes: 267 additions & 0 deletions server/src/storage/metrics_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
use std::{
ops::Range,
task::{Context, Poll},
time,
};

use async_trait::async_trait;
use bytes::Bytes;
use futures_util::{stream::BoxStream, Stream, StreamExt};
use object_store::{
path::Path, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
};
use tokio::io::AsyncWrite;

use crate::metrics::storage::s3::QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME;

#[derive(Debug)]
pub struct MetricLayer<T: ObjectStore> {
inner: T,
}

impl<T: ObjectStore> MetricLayer<T> {
pub fn new(inner: T) -> Self {
Self { inner }
}
}

impl<T: ObjectStore> std::fmt::Display for MetricLayer<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Metric({})", self.inner)
}
}

#[async_trait]
impl<T: ObjectStore> ObjectStore for MetricLayer<T> {
async fn put(&self, location: &Path, bytes: Bytes) -> object_store::Result<()> {
let time = time::Instant::now();
self.inner.put(location, bytes).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["PUT", "200"])
.observe(elapsed);
return Ok(());
}

// todo completly tracking multipart upload
async fn put_multipart(
&self,
location: &Path,
) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let time = time::Instant::now();
let (id, write) = self.inner.put_multipart(location).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["PUT_MULTIPART", "200"])
.observe(elapsed);

Ok((id, write))
}

async fn abort_multipart(
&self,
location: &Path,
multipart_id: &MultipartId,
) -> object_store::Result<()> {
let time = time::Instant::now();
let elapsed = time.elapsed().as_secs_f64();
self.inner.abort_multipart(location, multipart_id).await?;
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["PUT_MULTIPART_ABORT", "200"])
.observe(elapsed);
Ok(())
}

async fn append(
&self,
location: &Path,
) -> object_store::Result<Box<dyn AsyncWrite + Unpin + Send>> {
let time = time::Instant::now();
let write = self.inner.append(location).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["APPEND", "200"])
.observe(elapsed);

Ok(write)
}

async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
let time = time::Instant::now();
let res = self.inner.get(location).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["GET", "200"])
.observe(elapsed);
Ok(res)
}

async fn get_opts(
&self,
location: &Path,
options: GetOptions,
) -> object_store::Result<GetResult> {
let time = time::Instant::now();
let res = self.inner.get_opts(location, options).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["GET_OPTS", "200"])
.observe(elapsed);
Ok(res)
}

async fn get_range(&self, location: &Path, range: Range<usize>) -> object_store::Result<Bytes> {
let time = time::Instant::now();
let res = self.inner.get_range(location, range).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["GET_RANGE", "200"])
.observe(elapsed);
Ok(res)
}

async fn get_ranges(
&self,
location: &Path,
ranges: &[Range<usize>],
) -> object_store::Result<Vec<Bytes>> {
let time = time::Instant::now();
let res = self.inner.get_ranges(location, ranges).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["GET_RANGES", "200"])
.observe(elapsed);
Ok(res)
}

async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
let time = time::Instant::now();
let res = self.inner.head(location).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["HEAD", "200"])
.observe(elapsed);
Ok(res)
}

async fn delete(&self, location: &Path) -> object_store::Result<()> {
let time = time::Instant::now();
let res = self.inner.delete(location).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["DELETE", "200"])
.observe(elapsed);
Ok(res)
}

fn delete_stream<'a>(
&'a self,
locations: BoxStream<'a, object_store::Result<Path>>,
) -> BoxStream<'a, object_store::Result<Path>> {
self.inner.delete_stream(locations)
}

async fn list(
&self,
prefix: Option<&Path>,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
let time = time::Instant::now();
let inner = self.inner.list(prefix).await?;
let res = StreamMetricWrapper {
time,
labels: ["LIST", "200"],
inner,
};
Ok(Box::pin(res))
}

async fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>> {
let time = time::Instant::now();
let inner = self.inner.list_with_offset(prefix, offset).await?;
let res = StreamMetricWrapper {
time,
labels: ["LIST_OFFSET", "200"],
inner,
};
Ok(Box::pin(res))
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
let time = time::Instant::now();
let res = self.inner.list_with_delimiter(prefix).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["LIST_DELIM", "200"])
.observe(elapsed);
Ok(res)
}

async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
let time = time::Instant::now();
let res = self.inner.copy(from, to).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["COPY", "200"])
.observe(elapsed);
Ok(res)
}

async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
let time = time::Instant::now();
let res = self.inner.rename(from, to).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["RENAME", "200"])
.observe(elapsed);
Ok(res)
}

async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
let time = time::Instant::now();
let res = self.inner.copy_if_not_exists(from, to).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["COPY_IF", "200"])
.observe(elapsed);
Ok(res)
}

async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
let time = time::Instant::now();
let res = self.inner.rename_if_not_exists(from, to).await?;
let elapsed = time.elapsed().as_secs_f64();
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&["RENAME_IF", "200"])
.observe(elapsed);
Ok(res)
}
}

struct StreamMetricWrapper<'a, const N: usize, T> {
time: time::Instant,
labels: [&'static str; N],
inner: BoxStream<'a, T>,
}

impl<T, const N: usize> Stream for StreamMetricWrapper<'_, N, T> {
type Item = T;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.inner.poll_next_unpin(cx) {
t @ Poll::Ready(None) => {
QUERY_LAYER_STORAGE_REQUEST_RESPONSE_TIME
.with_label_values(&self.labels)
.observe(self.time.elapsed().as_secs_f64());
t
}
t => t,
}
}
}
2 changes: 2 additions & 0 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use std::time::{Duration, Instant};
use crate::metrics::storage::{s3::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::storage::{LogStream, ObjectStorage, ObjectStorageError};

use super::metrics_layer::MetricLayer;
use super::{object_storage, ObjectStorageProvider};

// in bytes
Expand Down Expand Up @@ -178,6 +179,7 @@ impl ObjectStorageProvider for S3Config {

// limit objectstore to a concurrent request limit
let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS);
let s3 = MetricLayer::new(s3);

let object_store_registry: DefaultObjectStoreRegistry = DefaultObjectStoreRegistry::new();
let url = ObjectStoreUrl::parse(format!("s3://{}", &self.bucket_name)).unwrap();
Expand Down

0 comments on commit 24be36f

Please sign in to comment.