diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 429393b39cc6..9b00bdeaf6dc 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -34,7 +34,7 @@ use session::context::QueryContextRef; use snafu::prelude::*; use super::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS}; -use crate::error::{self, Result, UnexpectedPhysicalTableSnafu}; +use crate::error::{self, Result}; use crate::prom_store::{snappy_decompress, zstd_decompress}; use crate::proto::PromWriteRequest; use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse}; @@ -72,67 +72,44 @@ impl Default for RemoteWriteQuery { /// Same with [remote_write] but won't store data to metric engine. #[axum_macros::debug_handler] pub async fn route_write_without_metric_engine( - State(handler): State, - Query(params): Query, - Extension(query_ctx): Extension, + handler: State, + query: Query, + extension: Extension, content_encoding: TypedHeader, - RawBody(body): RawBody, + raw_body: RawBody, ) -> Result { - // VictoriaMetrics handshake - if let Some(_vm_handshake) = params.get_vm_proto_version { - return Ok(VM_PROTO_VERSION.into_response()); - } - - let db = params.db.clone().unwrap_or_default(); - let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED - .with_label_values(&[db.as_str()]) - .start_timer(); - - let is_zstd = content_encoding.contains(VM_ENCODING); - let (request, samples) = decode_remote_write_request(is_zstd, body, true).await?; - // reject if physical table is specified when metric engine is disabled - if params.physical_table.is_some() { - return UnexpectedPhysicalTableSnafu {}.fail(); - } - - let output = handler.write(request, query_ctx, false).await?; - crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); - Ok(( - StatusCode::NO_CONTENT, - write_cost_header_map(output.meta.cost), + remote_write_impl( + handler, + query, + extension, + content_encoding, + raw_body, + true, + false, ) - .into_response()) + .await } /// Same with [remote_write] but won't store data to metric engine. /// And without strict_mode on will not check invalid UTF-8. #[axum_macros::debug_handler] pub async fn route_write_without_metric_engine_and_strict_mode( - State(handler): State, - Query(params): Query, - Extension(query_ctx): Extension, + handler: State, + query: Query, + extension: Extension, content_encoding: TypedHeader, - RawBody(body): RawBody, + raw_body: RawBody, ) -> Result { - let db = params.db.clone().unwrap_or_default(); - let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED - .with_label_values(&[db.as_str()]) - .start_timer(); - - let is_zstd = content_encoding.contains(VM_ENCODING); - let (request, samples) = decode_remote_write_request(is_zstd, body, false).await?; - // reject if physical table is specified when metric engine is disabled - if params.physical_table.is_some() { - return UnexpectedPhysicalTableSnafu {}.fail(); - } - - let output = handler.write(request, query_ctx, false).await?; - crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); - Ok(( - StatusCode::NO_CONTENT, - write_cost_header_map(output.meta.cost), + remote_write_impl( + handler, + query, + extension, + content_encoding, + raw_body, + false, + false, ) - .into_response()) + .await } #[axum_macros::debug_handler] @@ -141,39 +118,22 @@ pub async fn route_write_without_metric_engine_and_strict_mode( fields(protocol = "prometheus", request_type = "remote_write") )] pub async fn remote_write( - State(handler): State, - Query(params): Query, - Extension(mut query_ctx): Extension, + handler: State, + query: Query, + extension: Extension, content_encoding: TypedHeader, - RawBody(body): RawBody, + raw_body: RawBody, ) -> Result { - // VictoriaMetrics handshake - if let Some(_vm_handshake) = params.get_vm_proto_version { - return Ok(VM_PROTO_VERSION.into_response()); - } - - let db = params.db.clone().unwrap_or_default(); - let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED - .with_label_values(&[db.as_str()]) - .start_timer(); - - let is_zstd = content_encoding.contains(VM_ENCODING); - let (request, samples) = - decode_remote_write_request_to_row_inserts(is_zstd, body, true).await?; - - if let Some(physical_table) = params.physical_table { - let mut new_query_ctx = query_ctx.as_ref().clone(); - new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table); - query_ctx = Arc::new(new_query_ctx); - } - - let output = handler.write(request, query_ctx, true).await?; - crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); - Ok(( - StatusCode::NO_CONTENT, - write_cost_header_map(output.meta.cost), + remote_write_impl( + handler, + query, + extension, + content_encoding, + raw_body, + true, + true, ) - .into_response()) + .await } #[axum_macros::debug_handler] @@ -182,11 +142,32 @@ pub async fn remote_write( fields(protocol = "prometheus", request_type = "remote_write") )] pub async fn remote_write_without_strict_mode( + handler: State, + query: Query, + extension: Extension, + content_encoding: TypedHeader, + raw_body: RawBody, +) -> Result { + remote_write_impl( + handler, + query, + extension, + content_encoding, + raw_body, + false, + true, + ) + .await +} + +async fn remote_write_impl( State(handler): State, Query(params): Query, Extension(mut query_ctx): Extension, content_encoding: TypedHeader, RawBody(body): RawBody, + is_strict_mode: bool, + is_metric_engine: bool, ) -> Result { // VictoriaMetrics handshake if let Some(_vm_handshake) = params.get_vm_proto_version { @@ -199,8 +180,7 @@ pub async fn remote_write_without_strict_mode( .start_timer(); let is_zstd = content_encoding.contains(VM_ENCODING); - let (request, samples) = - decode_remote_write_request_to_row_inserts(is_zstd, body, false).await?; + let (request, samples) = decode_remote_write_request(is_zstd, body, is_strict_mode).await?; if let Some(physical_table) = params.physical_table { let mut new_query_ctx = query_ctx.as_ref().clone(); @@ -208,7 +188,7 @@ pub async fn remote_write_without_strict_mode( query_ctx = Arc::new(new_query_ctx); } - let output = handler.write(request, query_ctx, true).await?; + let output = handler.write(request, query_ctx, is_metric_engine).await?; crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); Ok(( StatusCode::NO_CONTENT, @@ -257,29 +237,6 @@ pub async fn remote_read( handler.read(request, query_ctx).await } -async fn decode_remote_write_request_to_row_inserts( - is_zstd: bool, - body: Body, - is_strict_mode: bool, -) -> Result<(RowInsertRequests, usize)> { - let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); - let body = hyper::body::to_bytes(body) - .await - .context(error::HyperSnafu)?; - - let buf = Bytes::from(if is_zstd { - zstd_decompress(&body[..])? - } else { - snappy_decompress(&body[..])? - }); - - let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default); - request - .merge(buf, is_strict_mode) - .context(error::DecodePromRemoteRequestSnafu)?; - Ok(request.as_row_insert_requests()) -} - async fn decode_remote_write_request( is_zstd: bool, body: Body, @@ -296,7 +253,7 @@ async fn decode_remote_write_request( snappy_decompress(&body[..])? }); - let mut request = PromWriteRequest::default(); + let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default); request .merge(buf, is_strict_mode) .context(error::DecodePromRemoteRequestSnafu)?;