Skip to content

Commit

Permalink
refactor code
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Apr 28, 2024
1 parent e60db78 commit 05a046d
Showing 1 changed file with 64 additions and 107 deletions.
171 changes: 64 additions & 107 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use session::context::QueryContextRef;
use snafu::prelude::*;

use super::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
use crate::error::{self, Result, UnexpectedPhysicalTableSnafu};
use crate::error::{self, Result};
use crate::prom_store::{snappy_decompress, zstd_decompress};
use crate::proto::PromWriteRequest;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};
Expand Down Expand Up @@ -72,67 +72,44 @@ impl Default for RemoteWriteQuery {
/// Same with [remote_write] but won't store data to metric engine.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
raw_body: RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}

let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body, true).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
}

let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
true,
false,
)
.into_response())
.await
}

/// Same with [remote_write] but won't store data to metric engine.
/// And without strict_mode on will not check invalid UTF-8.
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine_and_strict_mode(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
raw_body: RawBody,
) -> Result<impl IntoResponse> {
let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body, false).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
}

let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
false,
false,
)
.into_response())
.await
}

#[axum_macros::debug_handler]
Expand All @@ -141,39 +118,22 @@ pub async fn route_write_without_metric_engine_and_strict_mode(
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContextRef>,
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
raw_body: RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}

let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) =
decode_remote_write_request_to_row_inserts(is_zstd, body, true).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}

let output = handler.write(request, query_ctx, true).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
true,
true,
)
.into_response())
.await
}

#[axum_macros::debug_handler]
Expand All @@ -182,11 +142,32 @@ pub async fn remote_write(
fields(protocol = "prometheus", request_type = "remote_write")
)]
pub async fn remote_write_without_strict_mode(
handler: State<PromStoreProtocolHandlerRef>,
query: Query<RemoteWriteQuery>,
extension: Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
raw_body: RawBody,
) -> Result<impl IntoResponse> {
remote_write_impl(
handler,
query,
extension,
content_encoding,
raw_body,
false,
true,
)
.await
}

async fn remote_write_impl(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
is_strict_mode: bool,
is_metric_engine: bool,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
Expand All @@ -199,16 +180,15 @@ pub async fn remote_write_without_strict_mode(
.start_timer();

let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) =
decode_remote_write_request_to_row_inserts(is_zstd, body, false).await?;
let (request, samples) = decode_remote_write_request(is_zstd, body, is_strict_mode).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}

let output = handler.write(request, query_ctx, true).await?;
let output = handler.write(request, query_ctx, is_metric_engine).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
Expand Down Expand Up @@ -257,29 +237,6 @@ pub async fn remote_read(
handler.read(request, query_ctx).await
}

async fn decode_remote_write_request_to_row_inserts(
is_zstd: bool,
body: Body,
is_strict_mode: bool,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = Bytes::from(if is_zstd {
zstd_decompress(&body[..])?
} else {
snappy_decompress(&body[..])?
});

let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
.merge(buf, is_strict_mode)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
}

async fn decode_remote_write_request(
is_zstd: bool,
body: Body,
Expand All @@ -296,7 +253,7 @@ async fn decode_remote_write_request(
snappy_decompress(&body[..])?
});

let mut request = PromWriteRequest::default();
let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
.merge(buf, is_strict_mode)
.context(error::DecodePromRemoteRequestSnafu)?;
Expand Down

0 comments on commit 05a046d

Please sign in to comment.