From 49157868f991787e1ed5dc3db19565f391d6c12b Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 4 Mar 2024 18:15:10 +0800 Subject: [PATCH] feat: Correct server metrics and add more metrics for scan (#3426) * feat: drop timer on stream terminated * refactor: combine metrics into a histogram vec * refactor: frontend grpc metrics * feat: add metrics middleware layer to grpc server * refactor: move http metrics layer to metrics mod * feat: bucket for grpc/http elapsed * feat: remove duplicate metrics * style: fix cilppy * fix: incorrect bucket of promql series * feat: more metrics for mito * feat: convert cost * test: fix metrics test --- src/frontend/src/instance.rs | 19 +++- src/frontend/src/instance/grpc.rs | 11 ++- src/frontend/src/instance/script.rs | 4 +- src/frontend/src/metrics.rs | 34 ++++--- src/mito2/src/engine.rs | 5 +- src/mito2/src/memtable/merge_tree/tree.rs | 5 +- src/mito2/src/metrics.rs | 20 +++- src/mito2/src/read/scan_region.rs | 15 ++- src/mito2/src/read/seq_scan.rs | 47 ++++++++-- src/promql/src/metrics.rs | 7 +- src/query/src/datafusion.rs | 32 +++++-- src/query/src/dist_plan/merge_scan.rs | 10 +- src/query/src/lib.rs | 2 +- src/query/src/metrics.rs | 107 ++++++++++++++------- src/query/src/parser.rs | 6 +- src/servers/src/grpc.rs | 6 ++ src/servers/src/http.rs | 43 +-------- src/servers/src/http/handler.rs | 13 +-- src/servers/src/http/prometheus.rs | 9 -- src/servers/src/metrics.rs | 108 ++++++++++++---------- tests-integration/tests/http.rs | 3 +- 21 files changed, 313 insertions(+), 193 deletions(-) diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 55c45c71fdd0..7cb2e5afaf8b 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -47,6 +47,8 @@ use meta_client::MetaClientOptions; use operator::delete::DeleterRef; use operator::insert::InserterRef; use operator::statement::StatementExecutor; +use prometheus::HistogramTimer; +use query::metrics::OnDone; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; use query::query_engine::options::{validate_catalog_and_schema, QueryOptions}; @@ -85,7 +87,6 @@ use crate::error::{ }; use crate::frontend::{FrontendOptions, TomlSerializable}; use crate::heartbeat::HeartbeatTask; -use crate::metrics; use crate::script::ScriptExecutor; #[async_trait] @@ -276,7 +277,6 @@ impl SqlQueryHandler for Instance { type Error = Error; async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec> { - let _timer = metrics::METRIC_HANDLE_SQL_ELAPSED.start_timer(); let query_interceptor_opt = self.plugins.get::>(); let query_interceptor = query_interceptor_opt.as_ref(); let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) { @@ -336,7 +336,6 @@ impl SqlQueryHandler for Instance { } async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result { - let _timer = metrics::METRIC_EXEC_PLAN_ELAPSED.start_timer(); // plan should be prepared before exec // we'll do check there self.query_engine @@ -398,6 +397,19 @@ impl SqlQueryHandler for Instance { } } +/// Attaches a timer to the output and observes it once the output is exhausted. +pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output { + match output { + Output::AffectedRows(_) | Output::RecordBatches(_) => output, + Output::Stream(stream, plan) => { + let stream = OnDone::new(stream, move || { + timer.observe_duration(); + }); + Output::Stream(Box::pin(stream), plan) + } + } +} + #[async_trait] impl PrometheusHandler for Instance { async fn do_query( @@ -405,7 +417,6 @@ impl PrometheusHandler for Instance { query: &PromQuery, query_ctx: QueryContextRef, ) -> server_error::Result { - let _timer = metrics::METRIC_HANDLE_PROMQL_ELAPSED.start_timer(); let interceptor = self .plugins .get::>(); diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 49deda71fd8a..eb6cc4f257eb 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -31,7 +31,8 @@ use crate::error::{ Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result, TableOperationSnafu, }; -use crate::instance::Instance; +use crate::instance::{attach_timer, Instance}; +use crate::metrics::{GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED}; #[async_trait] impl GrpcQueryHandler for Instance { @@ -59,6 +60,7 @@ impl GrpcQueryHandler for Instance { })?; match query { Query::Sql(sql) => { + let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer(); let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await; ensure!( result.len() == 1, @@ -66,7 +68,8 @@ impl GrpcQueryHandler for Instance { feat: "execute multiple statements in SQL query string through GRPC interface" } ); - result.remove(0)? + let output = result.remove(0)?; + attach_timer(output, timer) } Query::LogicalPlan(_) => { return NotSupportedSnafu { @@ -75,6 +78,7 @@ impl GrpcQueryHandler for Instance { .fail(); } Query::PromRangeQuery(promql) => { + let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer(); let prom_query = PromQuery { query: promql.query, start: promql.start, @@ -89,7 +93,8 @@ impl GrpcQueryHandler for Instance { feat: "execute multiple statements in PromQL query string through GRPC interface" } ); - result.remove(0)? + let output = result.remove(0)?; + attach_timer(output, timer) } } } diff --git a/src/frontend/src/instance/script.rs b/src/frontend/src/instance/script.rs index ba38b5d4af1d..b8eac4b17c47 100644 --- a/src/frontend/src/instance/script.rs +++ b/src/frontend/src/instance/script.rs @@ -30,7 +30,7 @@ impl ScriptHandler for Instance { name: &str, script: &str, ) -> servers::error::Result<()> { - let _timer = metrics::METRIC_HANDLE_SCRIPTS_ELAPSED.start_timer(); + let _timer = metrics::INSERT_SCRIPTS_ELAPSED.start_timer(); self.script_executor .insert_script(query_ctx, name, script) .await @@ -42,7 +42,7 @@ impl ScriptHandler for Instance { name: &str, params: HashMap, ) -> servers::error::Result { - let _timer = metrics::METRIC_RUN_SCRIPT_ELAPSED.start_timer(); + let _timer = metrics::EXECUTE_SCRIPT_ELAPSED.start_timer(); self.script_executor .execute_script(query_ctx, name, params) .await diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 8475aca54c00..5c3c6122e492 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -16,22 +16,32 @@ use lazy_static::lazy_static; use prometheus::*; lazy_static! { - pub static ref METRIC_HANDLE_SQL_ELAPSED: Histogram = - register_histogram!("greptime_frontend_handle_sql_elapsed", "frontend handle sql elapsed").unwrap(); - pub static ref METRIC_HANDLE_PROMQL_ELAPSED: Histogram = register_histogram!( - "greptime_frontend_handle_promql_elapsed", - "frontend handle promql elapsed" + /// Timer of handling query in RPC handler. + pub static ref GRPC_HANDLE_QUERY_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_frontend_grpc_handle_query_elapsed", + "Elapsed time of handling queries in RPC handler", + &["type"], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); - pub static ref METRIC_EXEC_PLAN_ELAPSED: Histogram = - register_histogram!("greptime_frontend_exec_plan_elapsed", "frontend exec plan elapsed").unwrap(); - pub static ref METRIC_HANDLE_SCRIPTS_ELAPSED: Histogram = register_histogram!( - "greptime_frontend_handle_scripts_elapsed", - "frontend handle scripts elapsed" + pub static ref GRPC_HANDLE_SQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED + .with_label_values(&["sql"]); + pub static ref GRPC_HANDLE_PROMQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED + .with_label_values(&["promql"]); + + /// Timer of handling scripts in the script handler. + pub static ref HANDLE_SCRIPT_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_frontend_handle_script_elapsed", + "Elapsed time of handling scripts in the script handler", + &["type"], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); - pub static ref METRIC_RUN_SCRIPT_ELAPSED: Histogram = - register_histogram!("greptime_frontend_run_script_elapsed", "frontend run script elapsed").unwrap(); + pub static ref INSERT_SCRIPTS_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED + .with_label_values(&["insert"]); + pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED + .with_label_values(&["execute"]); + /// The samples count of Prometheus remote write. pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!( "greptime_frontend_prometheus_remote_write_samples", diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 943acdc59348..971edad74b0b 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -47,6 +47,7 @@ mod truncate_test; use std::any::Any; use std::sync::Arc; +use std::time::Instant; use async_trait::async_trait; use common_error::ext::BoxedError; @@ -219,6 +220,7 @@ impl EngineInner { /// Handles the scan `request` and returns a [Scanner] for the `request`. fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result { + let query_start = Instant::now(); // Reading a region doesn't need to go through the region worker thread. let region = self .workers @@ -239,7 +241,8 @@ impl EngineInner { Some(cache_manager), ) .with_parallelism(scan_parallelism) - .ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()); + .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) + .with_start_time(query_start); scan_region.scanner() } diff --git a/src/mito2/src/memtable/merge_tree/tree.rs b/src/mito2/src/memtable/merge_tree/tree.rs index 095010762a03..94c87f7583b3 100644 --- a/src/mito2/src/memtable/merge_tree/tree.rs +++ b/src/mito2/src/memtable/merge_tree/tree.rs @@ -39,7 +39,7 @@ use crate::memtable::merge_tree::partition::{ }; use crate::memtable::merge_tree::MergeTreeConfig; use crate::memtable::{BoxedBatchIterator, KeyValues}; -use crate::metrics::{MERGE_TREE_READ_STAGE_ELAPSED, READ_STAGE_ELAPSED}; +use crate::metrics::{MERGE_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED}; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -397,6 +397,9 @@ struct TreeIter { impl Drop for TreeIter { fn drop(&mut self) { + READ_ROWS_TOTAL + .with_label_values(&["merge_tree_memtable"]) + .inc_by(self.metrics.rows_fetched as u64); MERGE_TREE_READ_STAGE_ELAPSED .with_label_values(&["fetch_next_partition"]) .observe(self.metrics.fetch_partition_elapsed.as_secs_f64()); diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 1c345d055cfb..b9e7fb33d1a7 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -123,7 +123,7 @@ lazy_static! { vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); - /// Counter of rows read. + /// Counter of rows read from different source. pub static ref READ_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap(); /// Counter of filtered rows during merge. @@ -137,6 +137,24 @@ lazy_static! { register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap(); pub static ref READ_ROWS_IN_ROW_GROUP_TOTAL: IntCounterVec = register_int_counter_vec!("greptime_mito_read_rows_in_row_group_total", "mito read rows in row group total", &[TYPE_LABEL]).unwrap(); + /// Histogram for the number of SSTs to scan per query. + pub static ref READ_SST_COUNT: Histogram = register_histogram!( + "greptime_mito_read_sst_count", + "Number of SSTs to scan in a scan task", + vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 256.0, 1024.0], + ).unwrap(); + /// Histogram for the number of rows returned per query. + pub static ref READ_ROWS_RETURN: Histogram = register_histogram!( + "greptime_mito_read_rows_return", + "Number of rows returned in a scan task", + exponential_buckets(100.0, 10.0, 8).unwrap(), + ).unwrap(); + /// Histogram for the number of batches returned per query. + pub static ref READ_BATCHES_RETURN: Histogram = register_histogram!( + "greptime_mito_read_batches_return", + "Number of rows returned in a scan task", + exponential_buckets(100.0, 10.0, 7).unwrap(), + ).unwrap(); // ------- End of query metrics. // Cache related metrics. diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 564344b1ab4f..882903672564 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -15,6 +15,7 @@ //! Scans a region according to the scan request. use std::sync::Arc; +use std::time::Instant; use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{debug, warn}; @@ -124,6 +125,8 @@ pub(crate) struct ScanRegion { parallelism: ScanParallism, /// Whether to ignore inverted index. ignore_inverted_index: bool, + /// Start time of the scan task. + start_time: Option, } impl ScanRegion { @@ -141,6 +144,7 @@ impl ScanRegion { cache_manager, parallelism: ScanParallism::default(), ignore_inverted_index: false, + start_time: None, } } @@ -152,11 +156,17 @@ impl ScanRegion { } #[must_use] - pub(crate) fn ignore_inverted_index(mut self, ignore: bool) -> Self { + pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self { self.ignore_inverted_index = ignore; self } + #[must_use] + pub(crate) fn with_start_time(mut self, now: Instant) -> Self { + self.start_time = Some(now); + self + } + /// Returns a [Scanner] to scan the region. pub(crate) fn scanner(self) -> Result { self.seq_scan().map(Scanner::Seq) @@ -223,7 +233,8 @@ impl ScanRegion { .with_files(files) .with_cache(self.cache_manager) .with_index_applier(index_applier) - .with_parallelism(self.parallelism); + .with_parallelism(self.parallelism) + .with_start_time(self.start_time); Ok(seq_scan) } diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 135714e91ede..151210422baf 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -32,7 +32,7 @@ use crate::access_layer::AccessLayerRef; use crate::cache::{CacheManager, CacheManagerRef}; use crate::error::Result; use crate::memtable::MemtableRef; -use crate::metrics::READ_STAGE_ELAPSED; +use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_SST_COUNT, READ_STAGE_ELAPSED}; use crate::read::compat::{self, CompatReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::projection::ProjectionMapper; @@ -65,6 +65,8 @@ pub struct SeqScan { parallelism: ScanParallism, /// Index applier. index_applier: Option, + /// Start time of the query. + query_start: Option, } impl SeqScan { @@ -82,6 +84,7 @@ impl SeqScan { ignore_file_not_found: false, parallelism: ScanParallism::default(), index_applier: None, + query_start: None, } } @@ -141,10 +144,19 @@ impl SeqScan { self } + /// Sets start time of the query. + #[must_use] + pub(crate) fn with_start_time(mut self, now: Option) -> Self { + self.query_start = now; + self + } + /// Builds a stream for the query. pub async fn build_stream(&self) -> Result { - let start = Instant::now(); let mut metrics = Metrics::default(); + let build_start = Instant::now(); + let query_start = self.query_start.unwrap_or(build_start); + metrics.prepare_scan_cost = query_start.elapsed(); let use_parallel = self.use_parallel_reader(); // Scans all memtables and SSTs. Builds a merge reader to merge results. let mut reader = if use_parallel { @@ -152,9 +164,13 @@ impl SeqScan { } else { self.build_reader().await? }; - let elapsed = start.elapsed(); - metrics.build_reader_cost = elapsed; - metrics.scan_cost = elapsed; + metrics.build_reader_cost = build_start.elapsed(); + READ_STAGE_ELAPSED + .with_label_values(&["prepare_scan"]) + .observe(metrics.prepare_scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED + .with_label_values(&["build_reader"]) + .observe(metrics.build_reader_cost.as_secs_f64()); // Creates a stream to poll the batch reader and convert batch into record batch. let mapper = self.mapper.clone(); @@ -165,15 +181,22 @@ impl SeqScan { while let Some(batch) = Self::fetch_record_batch(&mut reader, &mapper, cache, &mut metrics).await? { + metrics.num_batches += 1; + metrics.num_rows += batch.num_rows(); yield batch; } + // Update metrics. + metrics.total_cost = query_start.elapsed(); + READ_STAGE_ELAPSED.with_label_values(&["convert_rb"]).observe(metrics.convert_cost.as_secs_f64()); + READ_STAGE_ELAPSED.with_label_values(&["scan"]).observe(metrics.scan_cost.as_secs_f64()); + READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.total_cost.as_secs_f64()); + READ_ROWS_RETURN.observe(metrics.num_rows as f64); + READ_BATCHES_RETURN.observe(metrics.num_batches as f64); debug!( "Seq scan finished, region_id: {:?}, metrics: {:?}, use_parallel: {}, parallelism: {}", mapper.metadata().region_id, metrics, use_parallel, parallelism, ); - // Update metrics. - READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64()); }; let stream = Box::pin(RecordBatchStreamWrapper::new( self.mapper.output_schema(), @@ -249,6 +272,8 @@ impl SeqScan { } } + READ_SST_COUNT.observe(self.files.len() as f64); + Ok(sources) } @@ -318,12 +343,20 @@ impl SeqScan { /// Metrics for [SeqScan]. #[derive(Debug, Default)] struct Metrics { + /// Duration to prepare the scan task. + prepare_scan_cost: Duration, /// Duration to build the reader. build_reader_cost: Duration, /// Duration to scan data. scan_cost: Duration, /// Duration to convert batches. convert_cost: Duration, + /// Duration of the scan. + total_cost: Duration, + /// Number of batches returned. + num_batches: usize, + /// Number of rows returned. + num_rows: usize, } #[cfg(test)] diff --git a/src/promql/src/metrics.rs b/src/promql/src/metrics.rs index 84070d4141b7..fc61e4fce6f8 100644 --- a/src/promql/src/metrics.rs +++ b/src/promql/src/metrics.rs @@ -17,6 +17,9 @@ use prometheus::*; lazy_static! { /// Counter for the number of series processed per query. - pub static ref PROMQL_SERIES_COUNT: Histogram = - register_histogram!("greptime_promql_series_count", "promql series count").unwrap(); + pub static ref PROMQL_SERIES_COUNT: Histogram = register_histogram!( + "greptime_promql_series_count", + "promql series count", + exponential_buckets(10.0, 10.0, 8).unwrap(), + ).unwrap(); } diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index c8bb0d1bda33..914a8b4f3cfe 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -53,6 +53,7 @@ use crate::error::{ }; use crate::executor::QueryExecutor; use crate::logical_optimizer::LogicalOptimizer; +use crate::metrics::{OnDone, QUERY_STAGE_ELAPSED}; use crate::physical_optimizer::PhysicalOptimizer; use crate::physical_planner::PhysicalPlanner; use crate::physical_wrapper::PhysicalPlanWrapperRef; @@ -108,6 +109,10 @@ impl DatafusionQueryEngine { } ); + let _timer = QUERY_STAGE_ELAPSED + .with_label_values(&[dml.op.name()]) + .start_timer(); + let default_catalog = &query_ctx.current_catalog().to_owned(); let default_schema = &query_ctx.current_schema().to_owned(); let table_name = dml.table_name.resolve(default_catalog, default_schema); @@ -302,7 +307,7 @@ impl QueryEngine for DatafusionQueryEngine { impl LogicalOptimizer for DatafusionQueryEngine { #[tracing::instrument(skip_all)] fn optimize(&self, context: &QueryEngineContext, plan: &LogicalPlan) -> Result { - let _timer = metrics::METRIC_OPTIMIZE_LOGICAL_ELAPSED.start_timer(); + let _timer = metrics::OPTIMIZE_LOGICAL_ELAPSED.start_timer(); match plan { LogicalPlan::DfPlan(df_plan) => { // Optimized by extension rules @@ -336,7 +341,7 @@ impl PhysicalPlanner for DatafusionQueryEngine { ctx: &mut QueryEngineContext, logical_plan: &LogicalPlan, ) -> Result> { - let _timer = metrics::METRIC_CREATE_PHYSICAL_ELAPSED.start_timer(); + let _timer = metrics::CREATE_PHYSICAL_ELAPSED.start_timer(); match logical_plan { LogicalPlan::DfPlan(df_plan) => { let state = ctx.state(); @@ -370,7 +375,7 @@ impl PhysicalOptimizer for DatafusionQueryEngine { ctx: &mut QueryEngineContext, plan: Arc, ) -> Result> { - let _timer = metrics::METRIC_OPTIMIZE_PHYSICAL_ELAPSED.start_timer(); + let _timer = metrics::OPTIMIZE_PHYSICAL_ELAPSED.start_timer(); let state = ctx.state(); let config = state.config_options(); @@ -418,16 +423,22 @@ impl QueryExecutor for DatafusionQueryEngine { ctx: &QueryEngineContext, plan: &Arc, ) -> Result { - let _timer = metrics::METRIC_EXEC_PLAN_ELAPSED.start_timer(); + let exec_timer = metrics::EXEC_PLAN_ELAPSED.start_timer(); let task_ctx = ctx.build_task_ctx(); match plan.output_partitioning().partition_count() { 0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))), - 1 => Ok(plan - .execute(0, task_ctx) - .context(error::ExecutePhysicalPlanSnafu) - .map_err(BoxedError::new) - .context(QueryExecutionSnafu))?, + 1 => { + let stream = plan + .execute(0, task_ctx) + .context(error::ExecutePhysicalPlanSnafu) + .map_err(BoxedError::new) + .context(QueryExecutionSnafu)?; + let stream = OnDone::new(stream, move || { + exec_timer.observe_duration(); + }); + Ok(Box::pin(stream)) + } _ => { let df_plan = Arc::new(DfPhysicalPlanAdapter(plan.clone())); // merge into a single partition @@ -444,6 +455,9 @@ impl QueryExecutor for DatafusionQueryEngine { .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; stream.set_metrics2(df_plan); + let stream = OnDone::new(Box::pin(stream), move || { + exec_timer.observe_duration(); + }); Ok(Box::pin(stream)) } } diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index d761923dea01..cca0081d4d49 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -47,9 +47,7 @@ use store_api::storage::RegionId; use tokio::time::Instant; use crate::error::ConvertSchemaSnafu; -use crate::metrics::{ - METRIC_MERGE_SCAN_ERRORS_TOTAL, METRIC_MERGE_SCAN_POLL_ELAPSED, METRIC_MERGE_SCAN_REGIONS, -}; +use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS}; use crate::region_query::RegionQueryHandlerRef; #[derive(Debug, Hash, PartialEq, Eq, Clone)] @@ -170,7 +168,7 @@ impl MergeScanExec { let tracing_context = TracingContext::from_json(context.session_id().as_str()); let stream = Box::pin(stream!({ - METRIC_MERGE_SCAN_REGIONS.observe(regions.len() as f64); + MERGE_SCAN_REGIONS.observe(regions.len() as f64); let _finish_timer = metric.finish_time().timer(); let mut ready_timer = metric.ready_time().timer(); let mut first_consume_timer = Some(metric.first_consume_time().timer()); @@ -188,7 +186,7 @@ impl MergeScanExec { .do_get(request) .await .map_err(|e| { - METRIC_MERGE_SCAN_ERRORS_TOTAL.inc(); + MERGE_SCAN_ERRORS_TOTAL.inc(); BoxedError::new(e) }) .context(ExternalSnafu)?; @@ -227,7 +225,7 @@ impl MergeScanExec { metric.record_greptime_exec_cost(value as usize); } - METRIC_MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); + MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); } })); diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 135acabb61d3..234353e393e9 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -21,7 +21,7 @@ pub mod dist_plan; pub mod error; pub mod executor; pub mod logical_optimizer; -mod metrics; +pub mod metrics; mod optimizer; pub mod parser; pub mod physical_optimizer; diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index 1b7ad6bca20a..eb5f9c3ede3e 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -12,53 +12,90 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::pin::Pin; +use std::task::{Context, Poll}; + +use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; +use datatypes::schema::SchemaRef; +use futures::Stream; +use futures_util::ready; use lazy_static::lazy_static; use prometheus::*; lazy_static! { - pub static ref METRIC_PARSE_SQL_ELAPSED: Histogram = register_histogram!( - "greptime_query_parse_sql_elapsed", - "query parse sql elapsed" - ) - .unwrap(); - pub static ref METRIC_PARSE_PROMQL_ELAPSED: Histogram = register_histogram!( - "greptime_query_parse_promql_elapsed", - "query parse promql elapsed" - ) - .unwrap(); - pub static ref METRIC_OPTIMIZE_LOGICAL_ELAPSED: Histogram = register_histogram!( - "greptime_query_optimize_logicalplan_elapsed", - "query optimize logicalplan elapsed" - ) - .unwrap(); - pub static ref METRIC_OPTIMIZE_PHYSICAL_ELAPSED: Histogram = register_histogram!( - "greptime_query_optimize_physicalplan_elapsed", - "query optimize physicalplan elapsed" - ) - .unwrap(); - pub static ref METRIC_CREATE_PHYSICAL_ELAPSED: Histogram = register_histogram!( - "greptime_query_create_physicalplan_elapsed", - "query create physicalplan elapsed" - ) - .unwrap(); - pub static ref METRIC_EXEC_PLAN_ELAPSED: Histogram = register_histogram!( - "greptime_query_execute_plan_elapsed", - "query execute plan elapsed" - ) - .unwrap(); - pub static ref METRIC_MERGE_SCAN_POLL_ELAPSED: Histogram = register_histogram!( - "greptime_query_merge_scan_poll_elapsed", - "query merge scan poll elapsed" + /// Timer of different stages in query. + pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_query_stage_elapsed", + "query engine time elapsed during each stage", + &["stage"], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); - pub static ref METRIC_MERGE_SCAN_REGIONS: Histogram = register_histogram!( + pub static ref PARSE_SQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED + .with_label_values(&["parse_sql"]); + pub static ref PARSE_PROMQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED + .with_label_values(&["parse_promql"]); + pub static ref OPTIMIZE_LOGICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED + .with_label_values(&["optimize_logicalplan"]); + pub static ref OPTIMIZE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED + .with_label_values(&["optimize_physicalplan"]); + pub static ref CREATE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED + .with_label_values(&["create_physicalplan"]); + pub static ref EXEC_PLAN_ELAPSED: Histogram = QUERY_STAGE_ELAPSED + .with_label_values(&["execute_plan"]); + pub static ref MERGE_SCAN_POLL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED + .with_label_values(&["merge_scan_poll"]); + + pub static ref MERGE_SCAN_REGIONS: Histogram = register_histogram!( "greptime_query_merge_scan_regions", "query merge scan regions" ) .unwrap(); - pub static ref METRIC_MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!( + pub static ref MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!( "greptime_query_merge_scan_errors_total", "query merge scan errors total" ) .unwrap(); } + +/// A stream to call the callback once a RecordBatch stream is done. +pub struct OnDone { + stream: SendableRecordBatchStream, + callback: Option, +} + +impl OnDone { + /// Attaches a `callback` to invoke once the `stream` is terminated. + pub fn new(stream: SendableRecordBatchStream, callback: F) -> Self { + Self { + stream, + callback: Some(callback), + } + } +} + +impl RecordBatchStream for OnDone { + fn schema(&self) -> SchemaRef { + self.stream.schema() + } +} + +impl Stream for OnDone { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match ready!(Pin::new(&mut self.stream).poll_next(cx)) { + Some(rb) => Poll::Ready(Some(rb)), + None => { + if let Some(callback) = self.callback.take() { + callback(); + } + Poll::Ready(None) + } + } + } + + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} diff --git a/src/query/src/parser.rs b/src/query/src/parser.rs index 82f6950d56a1..5c66a9b65924 100644 --- a/src/query/src/parser.rs +++ b/src/query/src/parser.rs @@ -33,7 +33,7 @@ use crate::error::{ AddSystemTimeOverflowSnafu, MultipleStatementsSnafu, ParseFloatSnafu, ParseTimestampSnafu, QueryParseSnafu, Result, UnimplementedSnafu, }; -use crate::metrics::{METRIC_PARSE_PROMQL_ELAPSED, METRIC_PARSE_SQL_ELAPSED}; +use crate::metrics::{PARSE_PROMQL_ELAPSED, PARSE_SQL_ELAPSED}; const DEFAULT_LOOKBACK: u64 = 5 * 60; // 5m pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; @@ -116,7 +116,7 @@ pub struct QueryLanguageParser {} impl QueryLanguageParser { /// Try to parse SQL with GreptimeDB dialect, return the statement when success. pub fn parse_sql(sql: &str, _query_ctx: &QueryContextRef) -> Result { - let _timer = METRIC_PARSE_SQL_ELAPSED.start_timer(); + let _timer = PARSE_SQL_ELAPSED.start_timer(); let mut statement = ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) .map_err(BoxedError::new) @@ -133,7 +133,7 @@ impl QueryLanguageParser { /// Try to parse PromQL, return the statement when success. pub fn parse_promql(query: &PromQuery, _query_ctx: &QueryContextRef) -> Result { - let _timer = METRIC_PARSE_PROMQL_ELAPSED.start_timer(); + let _timer = PARSE_PROMQL_ELAPSED.start_timer(); let expr = promql_parser::parser::parse(&query.query) .map_err(|msg| BoxedError::new(PlainError::new(msg, StatusCode::InvalidArguments))) diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 9b667a173263..23cc9ebb4174 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -43,6 +43,7 @@ use tonic_reflection::server::{ServerReflection, ServerReflectionServer}; use crate::error::{ AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, TcpBindSnafu, TcpIncomingSnafu, }; +use crate::metrics::MetricsMiddlewareLayer; use crate::server::Server; type TonicResult = std::result::Result; @@ -168,7 +169,12 @@ impl Server for GrpcServer { (incoming, addr) }; + let metrics_layer = tower::ServiceBuilder::new() + .layer(MetricsMiddlewareLayer) + .into_inner(); + let builder = tonic::transport::Server::builder() + .layer(metrics_layer) .add_routes(routes) .add_service(self.create_healthcheck_service()) .add_service(self.create_reflection_service()); diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 0a71513e2778..fde17b72a082 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::fmt::Display; use std::net::SocketAddr; use std::sync::Mutex as StdMutex; -use std::time::{Duration, Instant}; +use std::time::Duration; use aide::axum::{routing as apirouting, ApiRouter, IntoApiResponse}; use aide::openapi::{Info, OpenApi, Server as OpenAPIServer}; @@ -24,11 +24,9 @@ use aide::OperationOutput; use async_trait::async_trait; use auth::UserProviderRef; use axum::error_handling::HandleErrorLayer; -use axum::extract::{DefaultBodyLimit, MatchedPath}; -use axum::http::Request; -use axum::middleware::{self, Next}; +use axum::extract::DefaultBodyLimit; use axum::response::{Html, IntoResponse, Json, Response}; -use axum::{routing, BoxError, Extension, Router}; +use axum::{middleware, routing, BoxError, Extension, Router}; use common_base::readable_size::ReadableSize; use common_base::Plugins; use common_error::status_code::StatusCode; @@ -61,9 +59,7 @@ use crate::http::influxdb_result_v1::InfluxdbV1Response; use crate::http::prometheus::{ format_query, instant_query, label_values_query, labels_query, range_query, series_query, }; -use crate::metrics::{ - HTTP_TRACK_METRICS, METRIC_HTTP_REQUESTS_ELAPSED, METRIC_HTTP_REQUESTS_TOTAL, -}; +use crate::metrics::http_metrics_layer; use crate::metrics_handler::MetricsHandler; use crate::prometheus_handler::PrometheusHandlerRef; use crate::query_handler::sql::ServerSqlQueryHandlerRef; @@ -599,7 +595,7 @@ impl HttpServer { } // Add a layer to collect HTTP metrics for axum. - router = router.route_layer(middleware::from_fn(track_metrics)); + router = router.route_layer(middleware::from_fn(http_metrics_layer)); router } @@ -727,35 +723,6 @@ impl HttpServer { } } -/// A middleware to record metrics for HTTP. -// Based on https://github.com/tokio-rs/axum/blob/axum-v0.6.16/examples/prometheus-metrics/src/main.rs -pub(crate) async fn track_metrics(req: Request, next: Next) -> impl IntoResponse { - let _timer = HTTP_TRACK_METRICS - .with_label_values(&["value"]) - .start_timer(); - let start = Instant::now(); - let path = if let Some(matched_path) = req.extensions().get::() { - matched_path.as_str().to_owned() - } else { - req.uri().path().to_owned() - }; - let method = req.method().clone(); - - let response = next.run(req).await; - - let latency = start.elapsed().as_secs_f64(); - let status = response.status().as_u16().to_string(); - let method_str = method.to_string(); - - let labels = [method_str.as_str(), path.as_str(), status.as_str()]; - METRIC_HTTP_REQUESTS_TOTAL.with_label_values(&labels).inc(); - METRIC_HTTP_REQUESTS_ELAPSED - .with_label_values(&labels) - .observe(latency); - - response -} - pub const HTTP_SERVER: &str = "HTTP_SERVER"; #[async_trait] diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index edac1e98fbde..89bfaf138353 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -72,11 +72,15 @@ pub async fn sql( Extension(query_ctx): Extension, Form(form_params): Form, ) -> HttpResponse { + let start = Instant::now(); let sql_handler = &state.sql_handler; + let db = query_ctx.get_db_string(); + + let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED + .with_label_values(&[db.as_str()]) + .start_timer(); - let start = Instant::now(); let sql = query_params.sql.or(form_params.sql); - let db = query_ctx.get_db_string(); let format = query_params .format .or(form_params.format) @@ -89,10 +93,6 @@ pub async fn sql( .map(|s| s.to_lowercase()) .map(|s| Epoch::parse(s.as_str()).unwrap_or(Epoch::Millisecond)); - let _timer = crate::metrics::METRIC_HTTP_SQL_ELAPSED - .with_label_values(&[db.as_str()]) - .start_timer(); - let result = if let Some(sql) = &sql { if let Some((status, msg)) = validate_schema(sql_handler.clone(), query_ctx.clone()).await { Err((status, msg)) @@ -258,6 +258,7 @@ pub async fn promql( let sql_handler = &state.sql_handler; let exec_start = Instant::now(); let db = query_ctx.get_db_string(); + let _timer = crate::metrics::METRIC_HTTP_PROMQL_ELAPSED .with_label_values(&[db.as_str()]) .start_timer(); diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 246b71f0f026..89e86a526201 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -93,8 +93,6 @@ pub async fn format_query( Extension(_query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { - let _timer = crate::metrics::METRIC_HTTP_PROMQL_FORMAT_QUERY_ELAPSED.start_timer(); - let query = params.query.or(form_params.query).unwrap_or_default(); match promql_parser::parser::parse(&query) { Ok(expr) => { @@ -123,7 +121,6 @@ pub async fn instant_query( Extension(query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { - let _timer = crate::metrics::METRIC_HTTP_PROMQL_INSTANT_QUERY_ELAPSED.start_timer(); // Extract time from query string, or use current server time if not specified. let time = params .time @@ -163,7 +160,6 @@ pub async fn range_query( Extension(query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { - let _timer = crate::metrics::METRIC_HTTP_PROMQL_RANGE_QUERY_ELAPSED.start_timer(); let prom_query = PromQuery { query: params.query.or(form_params.query).unwrap_or_default(), start: params.start.or(form_params.start).unwrap_or_default(), @@ -232,8 +228,6 @@ pub async fn labels_query( Extension(query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { - let _timer = crate::metrics::METRIC_HTTP_PROMQL_LABEL_QUERY_ELAPSED.start_timer(); - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); @@ -498,8 +492,6 @@ pub async fn label_values_query( Extension(query_ctx): Extension, Query(params): Query, ) -> PrometheusJsonResponse { - let _timer = crate::metrics::METRIC_HTTP_PROMQL_LABEL_VALUE_QUERY_ELAPSED.start_timer(); - let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); let (catalog, schema) = parse_catalog_and_schema_from_db_string(db); @@ -624,7 +616,6 @@ pub async fn series_query( Extension(query_ctx): Extension, Form(form_params): Form, ) -> PrometheusJsonResponse { - let _timer = crate::metrics::METRIC_HTTP_PROMQL_SERIES_QUERY_ELAPSED.start_timer(); let mut queries: Vec = params.matches.0; if queries.is_empty() { queries = form_params.matches.0; diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index e9aac792d4fc..41d56702bc96 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -18,6 +18,10 @@ pub(crate) mod jemalloc; use std::task::{Context, Poll}; use std::time::Instant; +use axum::extract::MatchedPath; +use axum::http::Request; +use axum::middleware::Next; +use axum::response::IntoResponse; use hyper::Body; use lazy_static::lazy_static; use prometheus::{ @@ -48,16 +52,20 @@ lazy_static! { &[METRIC_PROTOCOL_LABEL] ) .unwrap(); + /// Http SQL query duration per database. pub static ref METRIC_HTTP_SQL_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_sql_elapsed", "servers http sql elapsed", - &[METRIC_DB_LABEL] + &[METRIC_DB_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); + /// Http pql query duration per database. pub static ref METRIC_HTTP_PROMQL_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_promql_elapsed", "servers http promql elapsed", - &[METRIC_DB_LABEL] + &[METRIC_DB_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); pub static ref METRIC_AUTH_FAILURE: IntCounterVec = register_int_counter_vec!( @@ -66,33 +74,41 @@ lazy_static! { &[METRIC_CODE_LABEL] ) .unwrap(); + /// Http influxdb write duration per database. pub static ref METRIC_HTTP_INFLUXDB_WRITE_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_influxdb_write_elapsed", "servers http influxdb write elapsed", - &[METRIC_DB_LABEL] + &[METRIC_DB_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); + /// Http prometheus write duration per database. pub static ref METRIC_HTTP_PROM_STORE_WRITE_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_prometheus_write_elapsed", "servers http prometheus write elapsed", - &[METRIC_DB_LABEL] + &[METRIC_DB_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); - pub static ref METRIC_HTTP_PROM_STORE_DECODE_ELAPSED: Histogram = register_histogram!( - "greptime_servers_http_prometheus_decode_elapsed", - "servers http prometheus decode elapsed", + /// Prometheus remote write codec duration. + pub static ref METRIC_HTTP_PROM_STORE_CODEC_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_servers_http_prometheus_codec_elapsed", + "servers http prometheus request codec duration", + &["type"], ) .unwrap(); + /// Decode duration of prometheus write request. + pub static ref METRIC_HTTP_PROM_STORE_DECODE_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED + .with_label_values(&["decode"]); + /// Duration to convert prometheus write request to gRPC request. + pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED + .with_label_values(&["convert"]); pub static ref METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES: Histogram = register_histogram!( "greptime_servers_http_prometheus_decode_num_series", "servers http prometheus decode num series", ) .unwrap(); - pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = register_histogram!( - "greptime_servers_http_prometheus_convert_elapsed", - "servers http prometheus convert to gRPC request elapsed", - ) - .unwrap(); + /// Http prometheus read duration per database. pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_prometheus_read_elapsed", "servers http prometheus read elapsed", @@ -118,36 +134,6 @@ lazy_static! { "servers opentsdb line write elapsed" ) .unwrap(); - pub static ref METRIC_HTTP_PROMQL_FORMAT_QUERY_ELAPSED: Histogram = register_histogram!( - "greptime_servers_http_promql_format_query_elapsed", - "servers http promql format query elapsed" - ) - .unwrap(); - pub static ref METRIC_HTTP_PROMQL_INSTANT_QUERY_ELAPSED: Histogram = register_histogram!( - "greptime_servers_http_promql_instant_query_elapsed", - "servers http promql instant query elapsed" - ) - .unwrap(); - pub static ref METRIC_HTTP_PROMQL_RANGE_QUERY_ELAPSED: Histogram = register_histogram!( - "greptime_servers_http_promql_range_query_elapsed", - "servers http promql range query elapsed" - ) - .unwrap(); - pub static ref METRIC_HTTP_PROMQL_LABEL_QUERY_ELAPSED: Histogram = register_histogram!( - "greptime_servers_http_promql_label_query_elapsed", - "servers http promql label query elapsed" - ) - .unwrap(); - pub static ref METRIC_HTTP_PROMQL_SERIES_QUERY_ELAPSED: Histogram = register_histogram!( - "greptime_servers_http_promql_series_query_elapsed", - "servers http promql series query elapsed" - ) - .unwrap(); - pub static ref METRIC_HTTP_PROMQL_LABEL_VALUE_QUERY_ELAPSED: Histogram = register_histogram!( - "greptime_servers_http_promql_label_value_query_elapsed", - "servers http promql label value query elapsed" - ) - .unwrap(); pub static ref METRIC_MYSQL_CONNECTIONS: IntGauge = register_int_gauge!( "greptime_servers_mysql_connection_count", "servers mysql connection count" @@ -202,7 +188,8 @@ lazy_static! { pub static ref METRIC_HTTP_REQUESTS_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_requests_elapsed", "servers http requests elapsed", - &[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL] + &[METRIC_METHOD_LABEL, METRIC_PATH_LABEL, METRIC_CODE_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); pub static ref METRIC_GRPC_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!( @@ -214,13 +201,8 @@ lazy_static! { pub static ref METRIC_GRPC_REQUESTS_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_grpc_requests_elapsed", "servers grpc requests elapsed", - &[METRIC_PATH_LABEL, METRIC_CODE_LABEL] - ) - .unwrap(); - pub static ref HTTP_TRACK_METRICS: HistogramVec = register_histogram_vec!( - "greptime_http_track_metrics", - "http track metrics", - &["tag"] + &[METRIC_PATH_LABEL, METRIC_CODE_LABEL], + vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0] ) .unwrap(); } @@ -284,3 +266,29 @@ where }) } } + +/// A middleware to record metrics for HTTP. +// Based on https://github.com/tokio-rs/axum/blob/axum-v0.6.16/examples/prometheus-metrics/src/main.rs +pub(crate) async fn http_metrics_layer(req: Request, next: Next) -> impl IntoResponse { + let start = Instant::now(); + let path = if let Some(matched_path) = req.extensions().get::() { + matched_path.as_str().to_owned() + } else { + req.uri().path().to_owned() + }; + let method = req.method().clone(); + + let response = next.run(req).await; + + let latency = start.elapsed().as_secs_f64(); + let status = response.status().as_u16().to_string(); + let method_str = method.to_string(); + + let labels = [method_str.as_str(), path.as_str(), status.as_str()]; + METRIC_HTTP_REQUESTS_TOTAL.with_label_values(&labels).inc(); + METRIC_HTTP_REQUESTS_ELAPSED + .with_label_values(&labels) + .observe(latency); + + response +} diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f424ad89422a..1d62ed4a07d3 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -557,7 +557,8 @@ pub async fn test_metrics_api(store_type: StorageType) { let res = client.get("/metrics").send().await; assert_eq!(res.status(), StatusCode::OK); let body = res.text().await; - assert!(body.contains("frontend_handle_sql_elapsed")); + // Comment in the metrics text. + assert!(body.contains("# HELP")); guard.remove_all().await; }