diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 0c12658b37a5..f28179d40d59 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -24,7 +24,6 @@ use pipeline::PipelineWay; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; -use servers::otlp::plugin::TraceParserRef; use servers::query_handler::OpenTelemetryProtocolHandler; use session::context::QueryContextRef; use snafu::ResultExt; @@ -64,6 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance { async fn traces( &self, request: ExportTraceServiceRequest, + table_name: String, ctx: QueryContextRef, ) -> ServerResult { self.plugins @@ -77,13 +77,7 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; - let (table_name, spans) = match self.plugins.get::() { - Some(parser) => (parser.table_name(), parser.parse(request)), - None => ( - otlp::trace::TRACE_TABLE_NAME.to_string(), - otlp::trace::parse(request), - ), - }; + let spans = otlp::trace::parse(request); let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?; diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index e564c584be58..1abfadeddbec 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -527,6 +527,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid table name"))] + InvalidTableName { + #[snafu(source)] + error: tonic::metadata::errors::ToStrError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to initialize a watcher for file {}", path))] FileWatch { path: String, @@ -620,7 +628,8 @@ impl ErrorExt for Error { | UnsupportedContentType { .. } | TimestampOverflow { .. } | OpenTelemetryLog { .. } - | UnsupportedJsonDataTypeForTag { .. } => StatusCode::InvalidArguments, + | UnsupportedJsonDataTypeForTag { .. } + | InvalidTableName { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/grpc/otlp.rs b/src/servers/src/grpc/otlp.rs index 76992e703fef..f3f71900eb14 100644 --- a/src/servers/src/grpc/otlp.rs +++ b/src/servers/src/grpc/otlp.rs @@ -24,10 +24,12 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; use session::context::{Channel, QueryContext}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use tonic::{Request, Response, Status}; use crate::error; +use crate::http::header::constants::GREPTIME_TRACE_TABLE_NAME_HEADER_NAME; +use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::OpenTelemetryProtocolHandlerRef; pub struct OtlpService { @@ -46,7 +48,15 @@ impl TraceService for OtlpService { &self, request: Request, ) -> StdResult, Status> { - let (_headers, extensions, req) = request.into_parts(); + let (headers, extensions, req) = request.into_parts(); + + let table_name = match headers.get(GREPTIME_TRACE_TABLE_NAME_HEADER_NAME) { + Some(table_name) => table_name + .to_str() + .context(error::InvalidTableNameSnafu)? + .to_string(), + None => TRACE_TABLE_NAME.to_string(), + }; let mut ctx = extensions .get::() @@ -55,7 +65,7 @@ impl TraceService for OtlpService { ctx.set_channel(Channel::Otlp); let ctx = Arc::new(ctx); - let _ = self.handler.traces(req, ctx).await?; + let _ = self.handler.traces(req, table_name, ctx).await?; Ok(Response::new(ExportTraceServiceResponse { partial_success: None, diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index f5b833f6739f..16962a56395a 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -48,6 +48,7 @@ pub mod constants { pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version"; pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name"; pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys"; + pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name"; } pub static GREPTIME_DB_HEADER_FORMAT: HeaderName = diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 6e5a583c0d62..5059afd9722e 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -24,6 +24,7 @@ use axum::response::IntoResponse; use axum::{async_trait, Extension}; use bytes::Bytes; use common_telemetry::tracing; +use http::HeaderMap; use opentelemetry_proto::tonic::collector::logs::v1::{ ExportLogsServiceRequest, ExportLogsServiceResponse, }; @@ -41,11 +42,13 @@ use snafu::prelude::*; use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; -use crate::error::{self, PipelineSnafu, Result}; +use crate::error::{self, InvalidUtf8ValueSnafu, PipelineSnafu, Result}; use crate::http::header::constants::{ GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, - GREPTIME_LOG_TABLE_NAME_HEADER_NAME, + GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, }; +use crate::otlp::logs::LOG_TABLE_NAME; +use crate::otlp::trace::TRACE_TABLE_NAME; use crate::query_handler::OpenTelemetryProtocolHandlerRef; #[axum_macros::debug_handler] @@ -80,10 +83,18 @@ pub async fn metrics( #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] pub async fn traces( State(handler): State, + header: HeaderMap, Extension(mut query_ctx): Extension, bytes: Bytes, ) -> Result> { let db = query_ctx.get_db_string(); + let table_name = extract_string_value_from_header( + &header, + GREPTIME_TRACE_TABLE_NAME_HEADER_NAME, + Some(TRACE_TABLE_NAME), + )? + // safety here, we provide default value for table_name + .unwrap(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_TRACES_ELAPSED @@ -92,7 +103,7 @@ pub async fn traces( let request = ExportTraceServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; handler - .traces(request, query_ctx) + .traces(request, table_name, query_ctx) .await .map(|o| OtlpResponse { resp_body: ExportTraceServiceResponse { @@ -107,17 +118,31 @@ pub struct PipelineInfo { pub pipeline_version: Option, } -fn pipeline_header_error( - header: &HeaderValue, - key: &str, -) -> StdResult { - let header_utf8 = str::from_utf8(header.as_bytes()); - match header_utf8 { - Ok(s) => Ok(s.to_string()), - Err(_) => Err(( +fn parse_header_value_to_string(header: &HeaderValue) -> Result { + String::from_utf8(header.as_bytes().to_vec()).context(InvalidUtf8ValueSnafu) +} + +fn extract_string_value_from_header( + headers: &HeaderMap, + header: &str, + default_table_name: Option<&str>, +) -> Result> { + let table_name = headers.get(header); + match table_name { + Some(name) => parse_header_value_to_string(name).map(Some), + None => match default_table_name { + Some(name) => Ok(Some(name.to_string())), + None => Ok(None), + }, + } +} + +fn utf8_error(header_name: &str) -> impl Fn(error::Error) -> (StatusCode, String) + use<'_> { + move |_| { + ( StatusCode::BAD_REQUEST, - format!("`{}` header is not valid UTF-8 string type.", key), - )), + format!("`{}` header is not valid UTF-8 string type.", header_name), + ) } } @@ -129,28 +154,27 @@ where type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let pipeline_name = parts.headers.get(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME); - let pipeline_version = parts.headers.get(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME); + let headers = &parts.headers; + let pipeline_name = + extract_string_value_from_header(headers, GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, None) + .map_err(utf8_error(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME))?; + let pipeline_version = extract_string_value_from_header( + headers, + GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, + None, + ) + .map_err(utf8_error(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME))?; match (pipeline_name, pipeline_version) { (Some(name), Some(version)) => Ok(PipelineInfo { - pipeline_name: Some(pipeline_header_error( - name, - GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, - )?), - pipeline_version: Some(pipeline_header_error( - version, - GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, - )?), + pipeline_name: Some(name), + pipeline_version: Some(version), }), (None, _) => Ok(PipelineInfo { pipeline_name: None, pipeline_version: None, }), (Some(name), None) => Ok(PipelineInfo { - pipeline_name: Some(pipeline_header_error( - name, - GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, - )?), + pipeline_name: Some(name), pipeline_version: None, }), } @@ -169,16 +193,16 @@ where type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME); + let table_name = extract_string_value_from_header( + &parts.headers, + GREPTIME_LOG_TABLE_NAME_HEADER_NAME, + Some(LOG_TABLE_NAME), + ) + .map_err(utf8_error(GREPTIME_LOG_TABLE_NAME_HEADER_NAME))? + // safety here, we provide default value for table_name + .unwrap(); - match table_name { - Some(name) => Ok(TableInfo { - table_name: pipeline_header_error(name, GREPTIME_LOG_TABLE_NAME_HEADER_NAME)?, - }), - None => Ok(TableInfo { - table_name: "opentelemetry_logs".to_string(), - }), - } + Ok(TableInfo { table_name }) } } @@ -192,16 +216,19 @@ where type Rejection = (StatusCode, String); async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { - let select = parts.headers.get(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME); + let select = extract_string_value_from_header( + &parts.headers, + GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME, + None, + ) + .map_err(utf8_error(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME))?; match select { Some(name) => { - let select_header = - pipeline_header_error(name, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?; - if select_header.is_empty() { + if name.is_empty() { Ok(SelectInfoWrapper(Default::default())) } else { - Ok(SelectInfoWrapper(SelectInfo::from(select_header))) + Ok(SelectInfoWrapper(SelectInfo::from(name))) } } None => Ok(SelectInfoWrapper(Default::default())), diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index cc1321ac7020..c55e2337a26c 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -14,5 +14,5 @@ pub mod logs; pub mod metrics; -pub mod plugin; pub mod trace; +mod utils; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 8f31a1db064b..f11cd4ff3c68 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -29,10 +29,12 @@ use pipeline::{Array, Map, PipelineWay, SchemaInfo, SelectInfo, Value as Pipelin use snafu::{ensure, ResultExt}; use super::trace::attributes::OtlpAnyValue; +use super::utils::{bytes_to_hex_string, key_value_to_jsonb}; use crate::error::{ IncompatibleSchemaSnafu, OpenTelemetryLogSnafu, Result, UnsupportedJsonDataTypeForTagSnafu, }; -use crate::otlp::trace::span::bytes_to_hex_string; + +pub const LOG_TABLE_NAME: &str = "opentelemetry_logs"; /// Convert OpenTelemetry metrics to GreptimeDB insert requests /// @@ -772,43 +774,6 @@ fn key_value_to_map(key_values: Vec) -> BTreeMap JsonbValue<'static> { - match value { - any_value::Value::StringValue(s) => JsonbValue::String(s.into()), - any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)), - any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)), - any_value::Value::BoolValue(b) => JsonbValue::Bool(b), - any_value::Value::ArrayValue(a) => { - let values = a - .values - .into_iter() - .map(|v| match v.value { - Some(value) => any_value_to_jsonb(value), - None => JsonbValue::Null, - }) - .collect(); - JsonbValue::Array(values) - } - any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values), - any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()), - } -} - -fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { - let mut map = BTreeMap::new(); - for kv in key_values { - let value = match kv.value { - Some(value) => match value.value { - Some(value) => any_value_to_jsonb(value), - None => JsonbValue::Null, - }, - None => JsonbValue::Null, - }; - map.insert(kv.key.clone(), value); - } - JsonbValue::Object(map) -} - fn log_body_to_string(body: &AnyValue) -> String { let otlp_value = OtlpAnyValue::from(body); otlp_value.to_string() diff --git a/src/servers/src/otlp/plugin.rs b/src/servers/src/otlp/plugin.rs deleted file mode 100644 index 1258fe167ea6..000000000000 --- a/src/servers/src/otlp/plugin.rs +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; - -use super::trace::span::TraceSpans; - -/// Transformer helps to transform ExportTraceServiceRequest based on logic, like: -/// - uplift some fields from Attributes (Map type) to column -pub trait TraceParser: Send + Sync { - fn parse(&self, request: ExportTraceServiceRequest) -> TraceSpans; - fn table_name(&self) -> String; -} - -pub type TraceParserRef = Arc; diff --git a/src/servers/src/otlp/trace.rs b/src/servers/src/otlp/trace.rs index edcdb8fc0b0c..9572ea4df142 100644 --- a/src/servers/src/otlp/trace.rs +++ b/src/servers/src/otlp/trace.rs @@ -15,15 +15,15 @@ use api::v1::value::ValueData; use api::v1::{ColumnDataType, RowInsertRequests}; use common_grpc::precision::Precision; -use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use self::span::{parse_span, TraceSpan, TraceSpans}; use crate::error::Result; +use crate::otlp::utils::{make_column_data, make_string_column_data}; use crate::row_writer::{self, MultiTableData, TableData}; const APPROXIMATE_COLUMN_COUNT: usize = 24; -pub const TRACE_TABLE_NAME: &str = "traces_preview_v01"; +pub const TRACE_TABLE_NAME: &str = "opentelemetry_traces"; pub mod attributes; pub mod span; @@ -43,7 +43,7 @@ pub fn parse(request: ExportTraceServiceRequest) -> TraceSpans { for scope_spans in resource_spans.scope_spans { let scope = scope_spans.scope.unwrap_or_default(); for span in scope_spans.spans { - spans.push(parse_span(resource_attrs.clone(), scope.clone(), span)); + spans.push(parse_span(&resource_attrs, &scope, span)); } } } @@ -72,73 +72,79 @@ pub fn to_grpc_insert_requests( pub fn write_span_to_row(writer: &mut TableData, span: TraceSpan) -> Result<()> { let mut row = writer.alloc_one_row(); - { - // tags - let iter = vec![ - ("trace_id", span.trace_id), - ("span_id", span.span_id), - ("parent_span_id", span.parent_span_id), - ] - .into_iter() - .map(|(col, val)| (col.to_string(), val)); - row_writer::write_tags(writer, iter, &mut row)?; - } - { - // fields - let str_fields_iter = vec![ - ("resource_attributes", span.resource_attributes.to_string()), - ("scope_name", span.scope_name), - ("scope_version", span.scope_version), - ("scope_attributes", span.scope_attributes.to_string()), - ("trace_state", span.trace_state), - ("span_name", span.span_name), - ("span_kind", span.span_kind), - ("span_status_code", span.span_status_code), - ("span_status_message", span.span_status_message), - ("span_attributes", span.span_attributes.to_string()), - ("span_events", span.span_events.to_string()), - ("span_links", span.span_links.to_string()), - ] - .into_iter() - .map(|(col, val)| { - ( - col.into(), - ColumnDataType::String, - ValueData::StringValue(val), - ) - }); - - let time_fields_iter = vec![ - ("start", span.start_in_nanosecond), - ("end", span.end_in_nanosecond), - ] - .into_iter() - .map(|(col, val)| { - ( - col.into(), - ColumnDataType::TimestampNanosecond, - ValueData::TimestampNanosecondValue(val as i64), - ) - }); - - row_writer::write_fields(writer, str_fields_iter, &mut row)?; - row_writer::write_fields(writer, time_fields_iter, &mut row)?; - row_writer::write_fields(writer, span.uplifted_span_attributes.into_iter(), &mut row)?; - } - row_writer::write_f64( - writer, - GREPTIME_VALUE, - (span.end_in_nanosecond - span.start_in_nanosecond) as f64 / 1_000_000.0, // duration in millisecond - &mut row, - )?; + // write ts row_writer::write_ts_to_nanos( writer, - GREPTIME_TIMESTAMP, + "timestamp", Some(span.start_in_nanosecond as i64), Precision::Nanosecond, &mut row, )?; + // write ts fields + let fields = vec![ + make_column_data( + "timestamp_end", + ColumnDataType::TimestampNanosecond, + ValueData::TimestampNanosecondValue(span.end_in_nanosecond as i64), + ), + make_column_data( + "duration_nano", + ColumnDataType::Uint64, + ValueData::U64Value(span.end_in_nanosecond - span.start_in_nanosecond), + ), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + // tags + let iter = vec![ + ("trace_id", span.trace_id), + ("span_id", span.span_id), + ("parent_span_id", span.parent_span_id), + ] + .into_iter() + .map(|(col, val)| (col.to_string(), val)); + row_writer::write_tags(writer, iter, &mut row)?; + + // write fields + let fields = vec![ + make_string_column_data("span_kind", span.span_kind), + make_string_column_data("span_name", span.span_name), + make_string_column_data("span_status_code", span.span_status_code), + make_string_column_data("span_status_message", span.span_status_message), + make_string_column_data("trace_state", span.trace_state), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + row_writer::write_json( + writer, + "span_attributes", + span.span_attributes.into(), + &mut row, + )?; + row_writer::write_json(writer, "span_events", span.span_events.into(), &mut row)?; + row_writer::write_json(writer, "span_links", span.span_links.into(), &mut row)?; + + // write fields + let fields = vec![ + make_string_column_data("scope_name", span.scope_name), + make_string_column_data("scope_version", span.scope_version), + ]; + row_writer::write_fields(writer, fields.into_iter(), &mut row)?; + + row_writer::write_json( + writer, + "scope_attributes", + span.scope_attributes.into(), + &mut row, + )?; + + row_writer::write_json( + writer, + "resource_attributes", + span.resource_attributes.into(), + &mut row, + )?; writer.add_row(row); diff --git a/src/servers/src/otlp/trace/attributes.rs b/src/servers/src/otlp/trace/attributes.rs index 030c47887f71..2fce6225eddf 100644 --- a/src/servers/src/otlp/trace/attributes.rs +++ b/src/servers/src/otlp/trace/attributes.rs @@ -21,6 +21,8 @@ use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; use serde::ser::{SerializeMap, SerializeSeq}; use serde::Serialize; +use crate::otlp::utils::key_value_to_jsonb; + #[derive(Clone, Debug)] pub struct OtlpAnyValue<'a>(&'a AnyValue); @@ -113,6 +115,18 @@ impl From> for Attributes { } } +impl From<&[KeyValue]> for Attributes { + fn from(attrs: &[KeyValue]) -> Self { + Self(attrs.to_vec()) + } +} + +impl From for jsonb::Value<'static> { + fn from(attrs: Attributes) -> jsonb::Value<'static> { + key_value_to_jsonb(attrs.0) + } +} + impl Attributes { pub fn get_ref(&self) -> &Vec { &self.0 diff --git a/src/servers/src/otlp/trace/span.rs b/src/servers/src/otlp/trace/span.rs index d3435bad3b4a..02fc523f66c7 100644 --- a/src/servers/src/otlp/trace/span.rs +++ b/src/servers/src/otlp/trace/span.rs @@ -14,8 +14,6 @@ use std::fmt::Display; -use api::v1::value::ValueData; -use api::v1::ColumnDataType; use common_time::timestamp::Timestamp; use itertools::Itertools; use opentelemetry_proto::tonic::common::v1::{InstrumentationScope, KeyValue}; @@ -24,6 +22,7 @@ use opentelemetry_proto::tonic::trace::v1::{Span, Status}; use serde::Serialize; use super::attributes::Attributes; +use crate::otlp::utils::bytes_to_hex_string; #[derive(Debug, Clone)] pub struct TraceSpan { @@ -47,8 +46,6 @@ pub struct TraceSpan { pub span_links: SpanLinks, // TODO(yuanbohan): List in the future pub start_in_nanosecond: u64, // this is also the Timestamp Index pub end_in_nanosecond: u64, - - pub uplifted_span_attributes: Vec<(String, ColumnDataType, ValueData)>, } pub type TraceSpans = Vec; @@ -72,6 +69,30 @@ impl From for SpanLink { } } +impl From for jsonb::Value<'static> { + fn from(value: SpanLink) -> jsonb::Value<'static> { + jsonb::Value::Object( + vec![ + ( + "trace_id".to_string(), + jsonb::Value::String(value.trace_id.into()), + ), + ( + "span_id".to_string(), + jsonb::Value::String(value.span_id.into()), + ), + ( + "trace_state".to_string(), + jsonb::Value::String(value.trace_state.into()), + ), + ("attributes".to_string(), value.attributes.into()), + ] + .into_iter() + .collect(), + ) + } +} + #[derive(Debug, Clone, Serialize)] pub struct SpanLinks(Vec); @@ -82,6 +103,12 @@ impl From> for SpanLinks { } } +impl From for jsonb::Value<'static> { + fn from(value: SpanLinks) -> jsonb::Value<'static> { + jsonb::Value::Array(value.0.into_iter().map(Into::into).collect()) + } +} + impl Display for SpanLinks { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) @@ -115,6 +142,20 @@ impl From for SpanEvent { } } +impl From for jsonb::Value<'static> { + fn from(value: SpanEvent) -> jsonb::Value<'static> { + jsonb::Value::Object( + vec![ + ("name".to_string(), jsonb::Value::String(value.name.into())), + ("time".to_string(), jsonb::Value::String(value.time.into())), + ("attributes".to_string(), value.attributes.into()), + ] + .into_iter() + .collect(), + ) + } +} + #[derive(Debug, Clone, Serialize)] pub struct SpanEvents(Vec); @@ -125,6 +166,12 @@ impl From> for SpanEvents { } } +impl From for jsonb::Value<'static> { + fn from(value: SpanEvents) -> jsonb::Value<'static> { + jsonb::Value::Array(value.0.into_iter().map(Into::into).collect()) + } +} + impl Display for SpanEvents { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) @@ -142,8 +189,8 @@ impl SpanEvents { } pub fn parse_span( - resource_attrs: Vec, - scope: InstrumentationScope, + resource_attrs: &[KeyValue], + scope: &InstrumentationScope, span: Span, ) -> TraceSpan { let (span_status_code, span_status_message) = status_to_string(&span.status); @@ -156,9 +203,9 @@ pub fn parse_span( resource_attributes: Attributes::from(resource_attrs), trace_state: span.trace_state, - scope_name: scope.name, - scope_version: scope.version, - scope_attributes: Attributes::from(scope.attributes), + scope_name: scope.name.clone(), + scope_version: scope.version.clone(), + scope_attributes: Attributes::from(scope.attributes.clone()), span_name: span.name, span_kind, @@ -170,15 +217,9 @@ pub fn parse_span( start_in_nanosecond: span.start_time_unix_nano, end_in_nanosecond: span.end_time_unix_nano, - - uplifted_span_attributes: vec![], } } -pub fn bytes_to_hex_string(bs: &[u8]) -> String { - bs.iter().map(|b| format!("{:02x}", b)).join("") -} - pub fn status_to_string(status: &Option) -> (String, String) { match status { Some(status) => (status.code().as_str_name().into(), status.message.clone()), diff --git a/src/servers/src/otlp/utils.rs b/src/servers/src/otlp/utils.rs new file mode 100644 index 000000000000..be3741666df1 --- /dev/null +++ b/src/servers/src/otlp/utils.rs @@ -0,0 +1,79 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use api::v1::value::ValueData; +use api::v1::ColumnDataType; +use itertools::Itertools; +use jsonb::{Number as JsonbNumber, Value as JsonbValue}; +use opentelemetry_proto::tonic::common::v1::{any_value, KeyValue}; + +pub fn bytes_to_hex_string(bs: &[u8]) -> String { + bs.iter().map(|b| format!("{:02x}", b)).join("") +} + +pub fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> { + match value { + any_value::Value::StringValue(s) => JsonbValue::String(s.into()), + any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)), + any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)), + any_value::Value::BoolValue(b) => JsonbValue::Bool(b), + any_value::Value::ArrayValue(a) => { + let values = a + .values + .into_iter() + .map(|v| match v.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }) + .collect(); + JsonbValue::Array(values) + } + any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values), + any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()), + } +} + +pub fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { + let mut map = BTreeMap::new(); + for kv in key_values { + let value = match kv.value { + Some(value) => match value.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }, + None => JsonbValue::Null, + }; + map.insert(kv.key.clone(), value); + } + JsonbValue::Object(map) +} + +#[inline] +pub(crate) fn make_string_column_data( + name: &str, + value: String, +) -> (String, ColumnDataType, ValueData) { + make_column_data(name, ColumnDataType::String, ValueData::StringValue(value)) +} + +#[inline] +pub(crate) fn make_column_data( + name: &str, + data_type: ColumnDataType, + value: ValueData, +) -> (String, ColumnDataType, ValueData) { + (name.to_string(), data_type, value) +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 618f366eb040..cee866c61b93 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -119,6 +119,7 @@ pub trait OpenTelemetryProtocolHandler: LogHandler { async fn traces( &self, request: ExportTraceServiceRequest, + table_name: String, ctx: QueryContextRef, ) -> Result; diff --git a/src/servers/src/row_writer.rs b/src/servers/src/row_writer.rs index 542a2484b481..d13e5ec81566 100644 --- a/src/servers/src/row_writer.rs +++ b/src/servers/src/row_writer.rs @@ -14,10 +14,11 @@ use std::collections::HashMap; +use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; use api::v1::{ - ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, - Value, + ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row, + RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value, }; use common_grpc::precision::Precision; use common_time::timestamp::TimeUnit; @@ -199,6 +200,68 @@ pub fn write_f64( ) } +fn build_json_column_schema(name: impl ToString) -> ColumnSchema { + ColumnSchema { + column_name: name.to_string(), + datatype: ColumnDataType::Binary as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + ..Default::default() + } +} + +pub fn write_json( + table_data: &mut TableData, + name: impl ToString, + value: jsonb::Value, + one_row: &mut Vec, +) -> Result<()> { + write_by_schema( + table_data, + std::iter::once(( + build_json_column_schema(name), + ValueData::BinaryValue(value.to_vec()), + )), + one_row, + ) +} + +fn write_by_schema( + table_data: &mut TableData, + kv_iter: impl Iterator, + one_row: &mut Vec, +) -> Result<()> { + let TableData { + schema, + column_indexes, + .. + } = table_data; + + for (column_schema, value) in kv_iter { + let index = column_indexes.get(&column_schema.column_name); + if let Some(index) = index { + check_schema_number( + column_schema.datatype, + column_schema.semantic_type, + &schema[*index], + )?; + one_row[*index].value_data = Some(value); + } else { + let index = schema.len(); + let key = column_schema.column_name.clone(); + schema.push(column_schema); + column_indexes.insert(key, index); + one_row.push(Value { + value_data: Some(value), + }); + } + } + + Ok(()) +} + fn write_by_semantic_type( table_data: &mut TableData, semantic_type: SemanticType, @@ -358,23 +421,27 @@ fn check_schema( semantic_type: SemanticType, schema: &ColumnSchema, ) -> Result<()> { + check_schema_number(datatype as i32, semantic_type as i32, schema) +} + +fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> { ensure!( - schema.datatype == datatype as i32, + schema.datatype == datatype, IncompatibleSchemaSnafu { column_name: &schema.column_name, datatype: "datatype", expected: schema.datatype, - actual: datatype as i32, + actual: datatype, } ); ensure!( - schema.semantic_type == semantic_type as i32, + schema.semantic_type == semantic_type, IncompatibleSchemaSnafu { column_name: &schema.column_name, datatype: "semantic_type", expected: schema.semantic_type, - actual: semantic_type as i32, + actual: semantic_type, } ); diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index f5a1080d768a..a0cc3b2f5d0c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1594,18 +1594,18 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(StatusCode::OK, res.status()); // select traces data - let expected = r#"[["b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","lets-go","SPAN_KIND_CLIENT","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-server\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000],["b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","{\"service.name\":\"telemetrygen\"}","telemetrygen","","{}","","okey-dokey-0","SPAN_KIND_SERVER","STATUS_CODE_UNSET","","{\"net.peer.ip\":\"1.2.3.4\",\"peer.service\":\"telemetrygen-client\"}","[]","[]",1726631197820927000,1726631197821050000,0.123,1726631197820927000]]"#; + let expected = r#"[[1726631197820927000,1726631197821050000,123000,"b5e5fb572cf0a3335dd194a14145fef5","3364d2da58c9fd2b","","SPAN_KIND_CLIENT","lets-go","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-server"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}],[1726631197820927000,1726631197821050000,123000,"b5e5fb572cf0a3335dd194a14145fef5","74c82efa6f628e80","3364d2da58c9fd2b","SPAN_KIND_SERVER","okey-dokey-0","STATUS_CODE_UNSET","","",{"net.peer.ip":"1.2.3.4","peer.service":"telemetrygen-client"},[],[],"telemetrygen","",{},{"service.name":"telemetrygen"}]]"#; validate_data( "otlp_traces", &client, - "select * from traces_preview_v01;", + "select * from opentelemetry_traces;", expected, ) .await; // drop table let res = client - .get("/v1/sql?sql=drop table traces_preview_v01;") + .get("/v1/sql?sql=drop table opentelemetry_traces;") .send() .await; assert_eq!(res.status(), StatusCode::OK); @@ -1618,7 +1618,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { validate_data( "otlp_traces_with_gzip", &client, - "select * from traces_preview_v01;", + "select * from opentelemetry_traces;", expected, ) .await;