Skip to content

Commit

Permalink
feat: Correct server metrics and add more metrics for scan (#3426)
Browse files Browse the repository at this point in the history
* feat: drop timer on stream terminated

* refactor: combine metrics into a histogram vec

* refactor: frontend grpc metrics

* feat: add metrics middleware layer to grpc server

* refactor: move http metrics layer to metrics mod

* feat: bucket for grpc/http elapsed

* feat: remove duplicate metrics

* style: fix cilppy

* fix: incorrect bucket of promql series

* feat: more metrics for mito

* feat: convert cost

* test: fix metrics test
  • Loading branch information
evenyag authored Mar 4, 2024
1 parent ae2c18e commit 4915786
Show file tree
Hide file tree
Showing 21 changed files with 313 additions and 193 deletions.
19 changes: 15 additions & 4 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ use meta_client::MetaClientOptions;
use operator::delete::DeleterRef;
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prometheus::HistogramTimer;
use query::metrics::OnDone;
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::plan::LogicalPlan;
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
Expand Down Expand Up @@ -85,7 +87,6 @@ use crate::error::{
};
use crate::frontend::{FrontendOptions, TomlSerializable};
use crate::heartbeat::HeartbeatTask;
use crate::metrics;
use crate::script::ScriptExecutor;

#[async_trait]
Expand Down Expand Up @@ -276,7 +277,6 @@ impl SqlQueryHandler for Instance {
type Error = Error;

async fn do_query(&self, query: &str, query_ctx: QueryContextRef) -> Vec<Result<Output>> {
let _timer = metrics::METRIC_HANDLE_SQL_ELAPSED.start_timer();
let query_interceptor_opt = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor_opt.as_ref();
let query = match query_interceptor.pre_parsing(query, query_ctx.clone()) {
Expand Down Expand Up @@ -336,7 +336,6 @@ impl SqlQueryHandler for Instance {
}

async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
let _timer = metrics::METRIC_EXEC_PLAN_ELAPSED.start_timer();
// plan should be prepared before exec
// we'll do check there
self.query_engine
Expand Down Expand Up @@ -398,14 +397,26 @@ impl SqlQueryHandler for Instance {
}
}

/// Attaches a timer to the output and observes it once the output is exhausted.
pub fn attach_timer(output: Output, timer: HistogramTimer) -> Output {
match output {
Output::AffectedRows(_) | Output::RecordBatches(_) => output,
Output::Stream(stream, plan) => {
let stream = OnDone::new(stream, move || {
timer.observe_duration();
});
Output::Stream(Box::pin(stream), plan)
}
}
}

#[async_trait]
impl PrometheusHandler for Instance {
async fn do_query(
&self,
query: &PromQuery,
query_ctx: QueryContextRef,
) -> server_error::Result<Output> {
let _timer = metrics::METRIC_HANDLE_PROMQL_ELAPSED.start_timer();
let interceptor = self
.plugins
.get::<PromQueryInterceptorRef<server_error::Error>>();
Expand Down
11 changes: 8 additions & 3 deletions src/frontend/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ use crate::error::{
Error, IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, Result,
TableOperationSnafu,
};
use crate::instance::Instance;
use crate::instance::{attach_timer, Instance};
use crate::metrics::{GRPC_HANDLE_PROMQL_ELAPSED, GRPC_HANDLE_SQL_ELAPSED};

#[async_trait]
impl GrpcQueryHandler for Instance {
Expand Down Expand Up @@ -59,14 +60,16 @@ impl GrpcQueryHandler for Instance {
})?;
match query {
Query::Sql(sql) => {
let timer = GRPC_HANDLE_SQL_ELAPSED.start_timer();
let mut result = SqlQueryHandler::do_query(self, &sql, ctx.clone()).await;
ensure!(
result.len() == 1,
NotSupportedSnafu {
feat: "execute multiple statements in SQL query string through GRPC interface"
}
);
result.remove(0)?
let output = result.remove(0)?;
attach_timer(output, timer)
}
Query::LogicalPlan(_) => {
return NotSupportedSnafu {
Expand All @@ -75,6 +78,7 @@ impl GrpcQueryHandler for Instance {
.fail();
}
Query::PromRangeQuery(promql) => {
let timer = GRPC_HANDLE_PROMQL_ELAPSED.start_timer();
let prom_query = PromQuery {
query: promql.query,
start: promql.start,
Expand All @@ -89,7 +93,8 @@ impl GrpcQueryHandler for Instance {
feat: "execute multiple statements in PromQL query string through GRPC interface"
}
);
result.remove(0)?
let output = result.remove(0)?;
attach_timer(output, timer)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/instance/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl ScriptHandler for Instance {
name: &str,
script: &str,
) -> servers::error::Result<()> {
let _timer = metrics::METRIC_HANDLE_SCRIPTS_ELAPSED.start_timer();
let _timer = metrics::INSERT_SCRIPTS_ELAPSED.start_timer();
self.script_executor
.insert_script(query_ctx, name, script)
.await
Expand All @@ -42,7 +42,7 @@ impl ScriptHandler for Instance {
name: &str,
params: HashMap<String, String>,
) -> servers::error::Result<Output> {
let _timer = metrics::METRIC_RUN_SCRIPT_ELAPSED.start_timer();
let _timer = metrics::EXECUTE_SCRIPT_ELAPSED.start_timer();
self.script_executor
.execute_script(query_ctx, name, params)
.await
Expand Down
34 changes: 22 additions & 12 deletions src/frontend/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,32 @@ use lazy_static::lazy_static;
use prometheus::*;

lazy_static! {
pub static ref METRIC_HANDLE_SQL_ELAPSED: Histogram =
register_histogram!("greptime_frontend_handle_sql_elapsed", "frontend handle sql elapsed").unwrap();
pub static ref METRIC_HANDLE_PROMQL_ELAPSED: Histogram = register_histogram!(
"greptime_frontend_handle_promql_elapsed",
"frontend handle promql elapsed"
/// Timer of handling query in RPC handler.
pub static ref GRPC_HANDLE_QUERY_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_frontend_grpc_handle_query_elapsed",
"Elapsed time of handling queries in RPC handler",
&["type"],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_EXEC_PLAN_ELAPSED: Histogram =
register_histogram!("greptime_frontend_exec_plan_elapsed", "frontend exec plan elapsed").unwrap();
pub static ref METRIC_HANDLE_SCRIPTS_ELAPSED: Histogram = register_histogram!(
"greptime_frontend_handle_scripts_elapsed",
"frontend handle scripts elapsed"
pub static ref GRPC_HANDLE_SQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED
.with_label_values(&["sql"]);
pub static ref GRPC_HANDLE_PROMQL_ELAPSED: Histogram = GRPC_HANDLE_QUERY_ELAPSED
.with_label_values(&["promql"]);

/// Timer of handling scripts in the script handler.
pub static ref HANDLE_SCRIPT_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_frontend_handle_script_elapsed",
"Elapsed time of handling scripts in the script handler",
&["type"],
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
pub static ref METRIC_RUN_SCRIPT_ELAPSED: Histogram =
register_histogram!("greptime_frontend_run_script_elapsed", "frontend run script elapsed").unwrap();
pub static ref INSERT_SCRIPTS_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED
.with_label_values(&["insert"]);
pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED
.with_label_values(&["execute"]);

/// The samples count of Prometheus remote write.
pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!(
"greptime_frontend_prometheus_remote_write_samples",
Expand Down
5 changes: 4 additions & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ mod truncate_test;

use std::any::Any;
use std::sync::Arc;
use std::time::Instant;

use async_trait::async_trait;
use common_error::ext::BoxedError;
Expand Down Expand Up @@ -219,6 +220,7 @@ impl EngineInner {

/// Handles the scan `request` and returns a [Scanner] for the `request`.
fn handle_query(&self, region_id: RegionId, request: ScanRequest) -> Result<Scanner> {
let query_start = Instant::now();
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
Expand All @@ -239,7 +241,8 @@ impl EngineInner {
Some(cache_manager),
)
.with_parallelism(scan_parallelism)
.ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled());
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_start_time(query_start);

scan_region.scanner()
}
Expand Down
5 changes: 4 additions & 1 deletion src/mito2/src/memtable/merge_tree/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::memtable::merge_tree::partition::{
};
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::{BoxedBatchIterator, KeyValues};
use crate::metrics::{MERGE_TREE_READ_STAGE_ELAPSED, READ_STAGE_ELAPSED};
use crate::metrics::{MERGE_TREE_READ_STAGE_ELAPSED, READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
use crate::read::Batch;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};

Expand Down Expand Up @@ -397,6 +397,9 @@ struct TreeIter {

impl Drop for TreeIter {
fn drop(&mut self) {
READ_ROWS_TOTAL
.with_label_values(&["merge_tree_memtable"])
.inc_by(self.metrics.rows_fetched as u64);
MERGE_TREE_READ_STAGE_ELAPSED
.with_label_values(&["fetch_next_partition"])
.observe(self.metrics.fetch_partition_elapsed.as_secs_f64());
Expand Down
20 changes: 19 additions & 1 deletion src/mito2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ lazy_static! {
vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
)
.unwrap();
/// Counter of rows read.
/// Counter of rows read from different source.
pub static ref READ_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
/// Counter of filtered rows during merge.
Expand All @@ -137,6 +137,24 @@ lazy_static! {
register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap();
pub static ref READ_ROWS_IN_ROW_GROUP_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_in_row_group_total", "mito read rows in row group total", &[TYPE_LABEL]).unwrap();
/// Histogram for the number of SSTs to scan per query.
pub static ref READ_SST_COUNT: Histogram = register_histogram!(
"greptime_mito_read_sst_count",
"Number of SSTs to scan in a scan task",
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 256.0, 1024.0],
).unwrap();
/// Histogram for the number of rows returned per query.
pub static ref READ_ROWS_RETURN: Histogram = register_histogram!(
"greptime_mito_read_rows_return",
"Number of rows returned in a scan task",
exponential_buckets(100.0, 10.0, 8).unwrap(),
).unwrap();
/// Histogram for the number of batches returned per query.
pub static ref READ_BATCHES_RETURN: Histogram = register_histogram!(
"greptime_mito_read_batches_return",
"Number of rows returned in a scan task",
exponential_buckets(100.0, 10.0, 7).unwrap(),
).unwrap();
// ------- End of query metrics.

// Cache related metrics.
Expand Down
15 changes: 13 additions & 2 deletions src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! Scans a region according to the scan request.
use std::sync::Arc;
use std::time::Instant;

use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, warn};
Expand Down Expand Up @@ -124,6 +125,8 @@ pub(crate) struct ScanRegion {
parallelism: ScanParallism,
/// Whether to ignore inverted index.
ignore_inverted_index: bool,
/// Start time of the scan task.
start_time: Option<Instant>,
}

impl ScanRegion {
Expand All @@ -141,6 +144,7 @@ impl ScanRegion {
cache_manager,
parallelism: ScanParallism::default(),
ignore_inverted_index: false,
start_time: None,
}
}

Expand All @@ -152,11 +156,17 @@ impl ScanRegion {
}

#[must_use]
pub(crate) fn ignore_inverted_index(mut self, ignore: bool) -> Self {
pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
self.ignore_inverted_index = ignore;
self
}

#[must_use]
pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
self.start_time = Some(now);
self
}

/// Returns a [Scanner] to scan the region.
pub(crate) fn scanner(self) -> Result<Scanner> {
self.seq_scan().map(Scanner::Seq)
Expand Down Expand Up @@ -223,7 +233,8 @@ impl ScanRegion {
.with_files(files)
.with_cache(self.cache_manager)
.with_index_applier(index_applier)
.with_parallelism(self.parallelism);
.with_parallelism(self.parallelism)
.with_start_time(self.start_time);

Ok(seq_scan)
}
Expand Down
Loading

0 comments on commit 4915786

Please sign in to comment.