From 4622412dfec496ccc723fd0960ec61bdf428a79f Mon Sep 17 00:00:00 2001 From: localhost Date: Wed, 16 Oct 2024 12:36:08 +0800 Subject: [PATCH] feat: add API to write OpenTelemetry logs to GreptimeDB (#4755) * chore: otlp logs api * feat: add API to write OpenTelemetry logs to GreptimeDB * chore: fix test data schema error * chore: modify the underlying data structure of the pipeline value map type from hashmap to btremap to keep key order * chore: fix by pr comment * chore: resolve conflicts and add some test * chore: remove useless error * chore: change otlp header name * chore: fmt code * chore: fix integration test for otlp log write api * chore: fix by pr comment * chore: set otlp body with fulltext default --- Cargo.lock | 2 + Cargo.toml | 1 + src/datatypes/src/value.rs | 78 +-- src/frontend/src/instance/otlp.rs | 31 +- src/frontend/src/metrics.rs | 12 + src/pipeline/src/etl.rs | 36 ++ src/pipeline/src/etl/error.rs | 16 +- src/pipeline/src/etl/processor/cmcd.rs | 7 +- src/pipeline/src/etl/processor/regex.rs | 6 +- .../transform/transformer/greptime/coerce.rs | 130 +++-- src/pipeline/src/etl/value.rs | 93 +++- src/pipeline/src/etl/value/map.rs | 34 +- src/pipeline/src/lib.rs | 2 +- src/pipeline/tests/pipeline.rs | 52 ++ src/servers/Cargo.toml | 1 + src/servers/src/error.rs | 10 +- src/servers/src/http.rs | 10 +- src/servers/src/http/header.rs | 3 + src/servers/src/http/otlp.rs | 179 ++++++- src/servers/src/metrics.rs | 7 + src/servers/src/otlp.rs | 1 + src/servers/src/otlp/logs.rs | 506 ++++++++++++++++++ src/servers/src/query_handler.rs | 13 +- tests-integration/tests/http.rs | 58 +- 24 files changed, 1158 insertions(+), 130 deletions(-) create mode 100644 src/servers/src/otlp/logs.rs diff --git a/Cargo.lock b/Cargo.lock index c3736b2d9fd6..bf76687e720f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7549,6 +7549,7 @@ dependencies = [ "ordered-float 4.3.0", "percent-encoding", "rand", + "serde_json", "thiserror", ] @@ -10737,6 +10738,7 @@ dependencies = [ name = "servers" version = "0.9.4" dependencies = [ + "ahash 0.8.11", "aide", "api", "arrow", diff --git a/Cargo.toml b/Cargo.toml index 133db91819fa..484cdcf55dea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -138,6 +138,7 @@ opentelemetry-proto = { version = "0.5", features = [ "metrics", "trace", "with-serde", + "logs", ] } parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] } paste = "1.0" diff --git a/src/datatypes/src/value.rs b/src/datatypes/src/value.rs index b973a3156b03..76b59a8db790 100644 --- a/src/datatypes/src/value.rs +++ b/src/datatypes/src/value.rs @@ -38,6 +38,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu}; use crate::prelude::*; +use crate::schema::ColumnSchema; use crate::type_id::LogicalTypeId; use crate::types::{IntervalType, ListType}; use crate::vectors::ListVector; @@ -1286,39 +1287,52 @@ impl<'a> From>> for ValueRef<'a> { } } -impl<'a> TryFrom> for serde_json::Value { - type Error = serde_json::Error; - - fn try_from(value: ValueRef<'a>) -> serde_json::Result { - let json_value = match value { - ValueRef::Null => serde_json::Value::Null, - ValueRef::Boolean(v) => serde_json::Value::Bool(v), - ValueRef::UInt8(v) => serde_json::Value::from(v), - ValueRef::UInt16(v) => serde_json::Value::from(v), - ValueRef::UInt32(v) => serde_json::Value::from(v), - ValueRef::UInt64(v) => serde_json::Value::from(v), - ValueRef::Int8(v) => serde_json::Value::from(v), - ValueRef::Int16(v) => serde_json::Value::from(v), - ValueRef::Int32(v) => serde_json::Value::from(v), - ValueRef::Int64(v) => serde_json::Value::from(v), - ValueRef::Float32(v) => serde_json::Value::from(v.0), - ValueRef::Float64(v) => serde_json::Value::from(v.0), - ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()), - ValueRef::Binary(bytes) => serde_json::to_value(bytes)?, - ValueRef::Date(v) => serde_json::Value::Number(v.val().into()), - ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()), - ValueRef::List(v) => serde_json::to_value(v)?, - ValueRef::Timestamp(v) => serde_json::to_value(v.value())?, - ValueRef::Time(v) => serde_json::to_value(v.value())?, - ValueRef::IntervalYearMonth(v) => serde_json::Value::from(v), - ValueRef::IntervalDayTime(v) => serde_json::Value::from(v), - ValueRef::IntervalMonthDayNano(v) => serde_json::Value::from(v), - ValueRef::Duration(v) => serde_json::to_value(v.value())?, - ValueRef::Decimal128(v) => serde_json::to_value(v.to_string())?, - }; +/// transform a [ValueRef] to a [serde_json::Value]. +/// The json type will be handled specially +pub fn transform_value_ref_to_json_value<'a>( + value: ValueRef<'a>, + schema: &'a ColumnSchema, +) -> serde_json::Result { + let json_value = match value { + ValueRef::Null => serde_json::Value::Null, + ValueRef::Boolean(v) => serde_json::Value::Bool(v), + ValueRef::UInt8(v) => serde_json::Value::from(v), + ValueRef::UInt16(v) => serde_json::Value::from(v), + ValueRef::UInt32(v) => serde_json::Value::from(v), + ValueRef::UInt64(v) => serde_json::Value::from(v), + ValueRef::Int8(v) => serde_json::Value::from(v), + ValueRef::Int16(v) => serde_json::Value::from(v), + ValueRef::Int32(v) => serde_json::Value::from(v), + ValueRef::Int64(v) => serde_json::Value::from(v), + ValueRef::Float32(v) => serde_json::Value::from(v.0), + ValueRef::Float64(v) => serde_json::Value::from(v.0), + ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()), + ValueRef::Binary(bytes) => { + if let ConcreteDataType::Json(_) = schema.data_type { + match jsonb::from_slice(bytes) { + Ok(json) => json.into(), + Err(e) => { + error!(e; "Failed to parse jsonb"); + serde_json::Value::Null + } + } + } else { + serde_json::to_value(bytes)? + } + } + ValueRef::Date(v) => serde_json::Value::Number(v.val().into()), + ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()), + ValueRef::List(v) => serde_json::to_value(v)?, + ValueRef::Timestamp(v) => serde_json::to_value(v.value())?, + ValueRef::Time(v) => serde_json::to_value(v.value())?, + ValueRef::IntervalYearMonth(v) => serde_json::Value::from(v), + ValueRef::IntervalDayTime(v) => serde_json::Value::from(v), + ValueRef::IntervalMonthDayNano(v) => serde_json::Value::from(v), + ValueRef::Duration(v) => serde_json::to_value(v.value())?, + ValueRef::Decimal128(v) => serde_json::to_value(v.to_string())?, + }; - Ok(json_value) - } + Ok(json_value) } /// Reference to a [ListValue]. diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 09335af0804e..7ed16097152f 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -17,8 +17,10 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; use common_telemetry::tracing; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use pipeline::PipelineWay; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; @@ -28,7 +30,7 @@ use session::context::QueryContextRef; use snafu::ResultExt; use crate::instance::Instance; -use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; +use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS}; #[async_trait] impl OpenTelemetryProtocolHandler for Instance { @@ -92,4 +94,31 @@ impl OpenTelemetryProtocolHandler for Instance { .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu) } + + #[tracing::instrument(skip_all)] + async fn logs( + &self, + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, + ctx: QueryContextRef, + ) -> ServerResult { + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::Otlp) + .context(AuthSnafu)?; + + let interceptor_ref = self + .plugins + .get::>(); + interceptor_ref.pre_execute(ctx.clone())?; + let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?; + + self.handle_log_inserts(requests, ctx) + .await + .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64)) + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) + } } diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 33580c550e8b..a8bf4eb76eb5 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -41,16 +41,28 @@ lazy_static! { .with_label_values(&["insert"]); pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED .with_label_values(&["execute"]); + + /// The number of OpenTelemetry metrics send by frontend node. pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_metrics_rows", "frontend otlp metrics rows" ) .unwrap(); + + /// The number of OpenTelemetry traces send by frontend node. pub static ref OTLP_TRACES_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_traces_rows", "frontend otlp traces rows" ) .unwrap(); + + /// The number of OpenTelemetry logs send by frontend node. + pub static ref OTLP_LOGS_ROWS: IntCounter = register_int_counter!( + "greptime_frontend_otlp_logs_rows", + "frontend otlp logs rows" + ) + .unwrap(); + /// The number of heartbeats send by frontend node. pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!( "greptime_frontend_heartbeat_send_count", diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index f29032e4f8a2..bec06cab8bfb 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -210,6 +210,37 @@ where self.transformer.transform_mut(val) } + pub fn prepare_pipeline_value(&self, val: Value, result: &mut [Value]) -> Result<()> { + match val { + Value::Map(map) => { + let mut search_from = 0; + // because of the key in the json map is ordered + for (payload_key, payload_value) in map.values.into_iter() { + if search_from >= self.required_keys.len() { + break; + } + + // because of map key is ordered, required_keys is ordered too + if let Some(pos) = self.required_keys[search_from..] + .iter() + .position(|k| k == &payload_key) + { + result[search_from + pos] = payload_value; + // next search from is always after the current key + search_from += pos; + } + } + } + Value::String(_) => { + result[0] = val; + } + _ => { + return PrepareValueMustBeObjectSnafu.fail(); + } + } + Ok(()) + } + pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> { match val { serde_json::Value::Object(map) => { @@ -286,6 +317,11 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str .context(IntermediateKeyIndexSnafu { kind, key }) } +pub enum PipelineWay { + Identity, + Custom(std::sync::Arc>), +} + #[cfg(test)] mod tests { diff --git a/src/pipeline/src/etl/error.rs b/src/pipeline/src/etl/error.rs index 08654c2c825c..9fe0af7ceffb 100644 --- a/src/pipeline/src/etl/error.rs +++ b/src/pipeline/src/etl/error.rs @@ -438,18 +438,26 @@ pub enum Error { #[snafu(implicit)] location: Location, }, - #[snafu(display("failed to coerce complex value, not supported"))] - CoerceComplexType { + #[snafu(display("Can not coerce json type to {ty}"))] + CoerceJsonTypeTo { + ty: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( + "Can not coerce {ty} to json type. we only consider object and array to be json types." + ))] + CoerceTypeToJson { + ty: String, #[snafu(implicit)] location: Location, }, - #[snafu(display("failed to coerce value: {msg}"))] + #[snafu(display("Failed to coerce value: {msg}"))] CoerceIncompatibleTypes { msg: String, #[snafu(implicit)] location: Location, }, - #[snafu(display( "Invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}" ))] diff --git a/src/pipeline/src/etl/processor/cmcd.rs b/src/pipeline/src/etl/processor/cmcd.rs index f43186b94aa0..06cfeb7c6905 100644 --- a/src/pipeline/src/etl/processor/cmcd.rs +++ b/src/pipeline/src/etl/processor/cmcd.rs @@ -402,7 +402,8 @@ impl Processor for CmcdProcessor { #[cfg(test)] mod tests { - use ahash::HashMap; + use std::collections::BTreeMap; + use urlencoding::decode; use super::{CmcdProcessorBuilder, CMCD_KEYS}; @@ -563,14 +564,14 @@ mod tests { let values = vec .into_iter() .map(|(k, v)| (k.to_string(), v)) - .collect::>(); + .collect::>(); let expected = Map { values }; let actual = processor.parse(0, &decoded).unwrap(); let actual = actual .into_iter() .map(|(index, value)| (intermediate_keys[index].clone(), value)) - .collect::>(); + .collect::>(); let actual = Map { values: actual }; assert_eq!(actual, expected); } diff --git a/src/pipeline/src/etl/processor/regex.rs b/src/pipeline/src/etl/processor/regex.rs index a74c19140c19..de25195f99ab 100644 --- a/src/pipeline/src/etl/processor/regex.rs +++ b/src/pipeline/src/etl/processor/regex.rs @@ -383,6 +383,8 @@ impl Processor for RegexProcessor { } #[cfg(test)] mod tests { + use std::collections::BTreeMap; + use ahash::{HashMap, HashMapExt}; use itertools::Itertools; @@ -475,14 +477,14 @@ ignore_missing: false"#; .map(|k| k.to_string()) .collect_vec(); let processor = builder.build(&intermediate_keys).unwrap(); - let mut result = HashMap::new(); + let mut result = BTreeMap::new(); for (index, pattern) in processor.patterns.iter().enumerate() { let r = processor .process(&breadcrumbs_str, pattern, (0, index)) .unwrap() .into_iter() .map(|(k, v)| (intermediate_keys[k].clone(), v)) - .collect::>(); + .collect::>(); result.extend(r); } let map = Map { values: result }; diff --git a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs index 8d07b34d462e..a0a96b356216 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime/coerce.rs @@ -12,16 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::column_data_type_extension::TypeExt; use api::v1::column_def::options_from_fulltext; -use api::v1::ColumnOptions; +use api::v1::{ColumnDataTypeExtension, ColumnOptions, JsonTypeExtension}; use datatypes::schema::FulltextOptions; use greptime_proto::v1::value::ValueData; use greptime_proto::v1::{ColumnDataType, ColumnSchema, SemanticType}; use snafu::ResultExt; use crate::etl::error::{ - CoerceComplexTypeSnafu, CoerceIncompatibleTypesSnafu, CoerceStringToTypeSnafu, - CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, + CoerceIncompatibleTypesSnafu, CoerceJsonTypeToSnafu, CoerceStringToTypeSnafu, + CoerceTypeToJsonSnafu, CoerceUnsupportedEpochTypeSnafu, CoerceUnsupportedNullTypeSnafu, CoerceUnsupportedNullTypeToSnafu, ColumnOptionsSnafu, Error, Result, }; use crate::etl::transform::index::Index; @@ -62,7 +63,10 @@ impl TryFrom for ValueData { } Value::Timestamp(Timestamp::Second(s)) => Ok(ValueData::TimestampSecondValue(s)), - Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(), + Value::Array(_) | Value::Map(_) => { + let data: jsonb::Value = value.into(); + Ok(ValueData::BinaryValue(data.to_vec())) + } } } } @@ -74,15 +78,15 @@ pub(crate) fn coerce_columns(transform: &Transform) -> Result> for field in transform.real_fields.iter() { let column_name = field.output_name().to_string(); - let datatype = coerce_type(transform)? as i32; + let (datatype, datatype_extension) = coerce_type(transform)?; let semantic_type = coerce_semantic_type(transform) as i32; let column = ColumnSchema { column_name, - datatype, + datatype: datatype as i32, semantic_type, - datatype_extension: None, + datatype_extension, options: coerce_options(transform)?, }; columns.push(column); @@ -111,30 +115,41 @@ fn coerce_options(transform: &Transform) -> Result> { } } -fn coerce_type(transform: &Transform) -> Result { +fn coerce_type(transform: &Transform) -> Result<(ColumnDataType, Option)> { match transform.type_ { - Value::Int8(_) => Ok(ColumnDataType::Int8), - Value::Int16(_) => Ok(ColumnDataType::Int16), - Value::Int32(_) => Ok(ColumnDataType::Int32), - Value::Int64(_) => Ok(ColumnDataType::Int64), + Value::Int8(_) => Ok((ColumnDataType::Int8, None)), + Value::Int16(_) => Ok((ColumnDataType::Int16, None)), + Value::Int32(_) => Ok((ColumnDataType::Int32, None)), + Value::Int64(_) => Ok((ColumnDataType::Int64, None)), - Value::Uint8(_) => Ok(ColumnDataType::Uint8), - Value::Uint16(_) => Ok(ColumnDataType::Uint16), - Value::Uint32(_) => Ok(ColumnDataType::Uint32), - Value::Uint64(_) => Ok(ColumnDataType::Uint64), + Value::Uint8(_) => Ok((ColumnDataType::Uint8, None)), + Value::Uint16(_) => Ok((ColumnDataType::Uint16, None)), + Value::Uint32(_) => Ok((ColumnDataType::Uint32, None)), + Value::Uint64(_) => Ok((ColumnDataType::Uint64, None)), - Value::Float32(_) => Ok(ColumnDataType::Float32), - Value::Float64(_) => Ok(ColumnDataType::Float64), + Value::Float32(_) => Ok((ColumnDataType::Float32, None)), + Value::Float64(_) => Ok((ColumnDataType::Float64, None)), - Value::Boolean(_) => Ok(ColumnDataType::Boolean), - Value::String(_) => Ok(ColumnDataType::String), + Value::Boolean(_) => Ok((ColumnDataType::Boolean, None)), + Value::String(_) => Ok((ColumnDataType::String, None)), - Value::Timestamp(Timestamp::Nanosecond(_)) => Ok(ColumnDataType::TimestampNanosecond), - Value::Timestamp(Timestamp::Microsecond(_)) => Ok(ColumnDataType::TimestampMicrosecond), - Value::Timestamp(Timestamp::Millisecond(_)) => Ok(ColumnDataType::TimestampMillisecond), - Value::Timestamp(Timestamp::Second(_)) => Ok(ColumnDataType::TimestampSecond), + Value::Timestamp(Timestamp::Nanosecond(_)) => { + Ok((ColumnDataType::TimestampNanosecond, None)) + } + Value::Timestamp(Timestamp::Microsecond(_)) => { + Ok((ColumnDataType::TimestampMicrosecond, None)) + } + Value::Timestamp(Timestamp::Millisecond(_)) => { + Ok((ColumnDataType::TimestampMillisecond, None)) + } + Value::Timestamp(Timestamp::Second(_)) => Ok((ColumnDataType::TimestampSecond, None)), - Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(), + Value::Array(_) | Value::Map(_) => Ok(( + ColumnDataType::Binary, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + )), Value::Null => CoerceUnsupportedNullTypeToSnafu { ty: transform.type_.to_str_type(), @@ -191,12 +206,12 @@ pub(crate) fn coerce_value(val: &Value, transform: &Transform) -> Result CoerceIncompatibleTypesSnafu { - msg: "Timestamp can only be coerced to another timestamp", + msg: "Timestamp can only be coerced to another type", } .fail(), }, - Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(), + Value::Array(_) | Value::Map(_) => coerce_json_value(val, transform), } } @@ -228,7 +243,12 @@ fn coerce_bool_value(b: bool, transform: &Transform) -> Result } }, - Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(), + Value::Array(_) | Value::Map(_) => { + return CoerceJsonTypeToSnafu { + ty: transform.type_.to_str_type(), + } + .fail() + } Value::Null => return Ok(None), }; @@ -264,7 +284,12 @@ fn coerce_i64_value(n: i64, transform: &Transform) -> Result> } }, - Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(), + Value::Array(_) | Value::Map(_) => { + return CoerceJsonTypeToSnafu { + ty: transform.type_.to_str_type(), + } + .fail() + } Value::Null => return Ok(None), }; @@ -300,7 +325,12 @@ fn coerce_u64_value(n: u64, transform: &Transform) -> Result> } }, - Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(), + Value::Array(_) | Value::Map(_) => { + return CoerceJsonTypeToSnafu { + ty: transform.type_.to_str_type(), + } + .fail() + } Value::Null => return Ok(None), }; @@ -336,7 +366,12 @@ fn coerce_f64_value(n: f64, transform: &Transform) -> Result> } }, - Value::Array(_) | Value::Map(_) => return CoerceComplexTypeSnafu.fail(), + Value::Array(_) | Value::Map(_) => { + return CoerceJsonTypeToSnafu { + ty: transform.type_.to_str_type(), + } + .fail() + } Value::Null => return Ok(None), }; @@ -411,12 +446,43 @@ fn coerce_string_value(s: &String, transform: &Transform) -> Result CoerceUnsupportedEpochTypeSnafu { ty: "String" }.fail(), }, - Value::Array(_) | Value::Map(_) => CoerceComplexTypeSnafu.fail(), + Value::Array(_) | Value::Map(_) => { + return CoerceJsonTypeToSnafu { + ty: transform.type_.to_str_type(), + } + .fail() + } Value::Null => Ok(None), } } +fn coerce_json_value(v: &Value, transform: &Transform) -> Result> { + match &transform.type_ { + Value::Array(_) | Value::Map(_) => (), + t => { + return CoerceTypeToJsonSnafu { + ty: t.to_str_type(), + } + .fail(); + } + } + match v { + Value::Map(_) => { + let data: jsonb::Value = v.into(); + Ok(Some(ValueData::BinaryValue(data.to_vec()))) + } + Value::Array(_) => { + let data: jsonb::Value = v.into(); + Ok(Some(ValueData::BinaryValue(data.to_vec()))) + } + _ => CoerceTypeToJsonSnafu { + ty: v.to_str_type(), + } + .fail(), + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/pipeline/src/etl/value.rs b/src/pipeline/src/etl/value.rs index 3adde2514b9e..531600025665 100644 --- a/src/pipeline/src/etl/value.rs +++ b/src/pipeline/src/etl/value.rs @@ -16,8 +16,10 @@ pub mod array; pub mod map; pub mod time; -use ahash::{HashMap, HashMapExt}; +use std::collections::BTreeMap; + pub use array::Array; +use jsonb::{Number as JsonbNumber, Object as JsonbObject, Value as JsonbValue}; pub use map::Map; use snafu::{OptionExt, ResultExt}; pub use time::Timestamp; @@ -57,6 +59,7 @@ pub enum Value { Timestamp(Timestamp), + /// We only consider object and array to be json types. Array(Array), Map(Map), } @@ -110,8 +113,9 @@ impl Value { _ => Ok(Value::Timestamp(Timestamp::Nanosecond(0))), }, - "array" => Ok(Value::Array(Array::default())), - "map" => Ok(Value::Map(Map::default())), + // We only consider object and array to be json types. and use Map to represent json + // TODO(qtang): Needs to be defined with better semantics + "json" => Ok(Value::Map(Map::default())), _ => ValueParseTypeSnafu { t }.fail(), } @@ -221,8 +225,7 @@ impl Value { Value::Timestamp(_) => "epoch", - Value::Array(_) => "array", - Value::Map(_) => "map", + Value::Array(_) | Value::Map(_) => "json", Value::Null => "null", } @@ -287,7 +290,7 @@ impl TryFrom for Value { Ok(Value::Array(Array { values })) } serde_json::Value::Object(v) => { - let mut values = HashMap::with_capacity(v.len()); + let mut values = BTreeMap::new(); for (k, v) in v { values.insert(k, Value::try_from(v)?); } @@ -318,7 +321,7 @@ impl TryFrom<&yaml_rust::Yaml> for Value { Ok(Value::Array(Array { values })) } yaml_rust::Yaml::Hash(v) => { - let mut values = HashMap::new(); + let mut values = BTreeMap::new(); for (k, v) in v { let key = k .as_str() @@ -331,3 +334,79 @@ impl TryFrom<&yaml_rust::Yaml> for Value { } } } + +impl<'a> From<&Value> for JsonbValue<'a> { + fn from(value: &Value) -> Self { + match value { + Value::Null => JsonbValue::Null, + Value::Boolean(v) => JsonbValue::Bool(*v), + + Value::Int8(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int16(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int32(v) => JsonbValue::Number(JsonbNumber::Int64(*v as i64)), + Value::Int64(v) => JsonbValue::Number(JsonbNumber::Int64(*v)), + + Value::Uint8(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint16(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint32(v) => JsonbValue::Number(JsonbNumber::UInt64(*v as u64)), + Value::Uint64(v) => JsonbValue::Number(JsonbNumber::UInt64(*v)), + Value::Float32(v) => JsonbValue::Number(JsonbNumber::Float64(*v as f64)), + Value::Float64(v) => JsonbValue::Number(JsonbNumber::Float64(*v)), + Value::String(v) => JsonbValue::String(v.clone().into()), + Value::Timestamp(v) => JsonbValue::String(v.to_string().into()), + Value::Array(arr) => { + let mut vals: Vec = Vec::with_capacity(arr.len()); + for val in arr.iter() { + vals.push(val.into()); + } + JsonbValue::Array(vals) + } + Value::Map(obj) => { + let mut map = JsonbObject::new(); + for (k, v) in obj.iter() { + let val: JsonbValue = v.into(); + map.insert(k.to_string(), val); + } + JsonbValue::Object(map) + } + } + } +} + +impl<'a> From for JsonbValue<'a> { + fn from(value: Value) -> Self { + match value { + Value::Null => JsonbValue::Null, + Value::Boolean(v) => JsonbValue::Bool(v), + + Value::Int8(v) => JsonbValue::Number(JsonbNumber::Int64(v as i64)), + Value::Int16(v) => JsonbValue::Number(JsonbNumber::Int64(v as i64)), + Value::Int32(v) => JsonbValue::Number(JsonbNumber::Int64(v as i64)), + Value::Int64(v) => JsonbValue::Number(JsonbNumber::Int64(v)), + + Value::Uint8(v) => JsonbValue::Number(JsonbNumber::UInt64(v as u64)), + Value::Uint16(v) => JsonbValue::Number(JsonbNumber::UInt64(v as u64)), + Value::Uint32(v) => JsonbValue::Number(JsonbNumber::UInt64(v as u64)), + Value::Uint64(v) => JsonbValue::Number(JsonbNumber::UInt64(v)), + Value::Float32(v) => JsonbValue::Number(JsonbNumber::Float64(v as f64)), + Value::Float64(v) => JsonbValue::Number(JsonbNumber::Float64(v)), + Value::String(v) => JsonbValue::String(v.into()), + Value::Timestamp(v) => JsonbValue::String(v.to_string().into()), + Value::Array(arr) => { + let mut vals: Vec = Vec::with_capacity(arr.len()); + for val in arr.into_iter() { + vals.push(val.into()); + } + JsonbValue::Array(vals) + } + Value::Map(obj) => { + let mut map = JsonbObject::new(); + for (k, v) in obj.into_iter() { + let val: JsonbValue = v.into(); + map.insert(k, val); + } + JsonbValue::Object(map) + } + } + } +} diff --git a/src/pipeline/src/etl/value/map.rs b/src/pipeline/src/etl/value/map.rs index b8b81da7563b..004a617b0f9c 100644 --- a/src/pipeline/src/etl/value/map.rs +++ b/src/pipeline/src/etl/value/map.rs @@ -12,21 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use ahash::{HashMap, HashMapExt}; +use std::collections::BTreeMap; + +use ahash::HashMap; use crate::etl::value::Value; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Default)] pub struct Map { - pub values: HashMap, -} - -impl Default for Map { - fn default() -> Self { - Self { - values: HashMap::with_capacity(30), - } - } + pub values: BTreeMap, } impl Map { @@ -47,12 +41,16 @@ impl Map { impl From> for Map { fn from(values: HashMap) -> Self { - Map { values } + let mut map = Map::default(); + for (k, v) in values.into_iter() { + map.insert(k, v); + } + map } } impl std::ops::Deref for Map { - type Target = HashMap; + type Target = BTreeMap; fn deref(&self) -> &Self::Target { &self.values @@ -65,6 +63,16 @@ impl std::ops::DerefMut for Map { } } +impl std::iter::IntoIterator for Map { + type Item = (String, Value); + + type IntoIter = std::collections::btree_map::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.values.into_iter() + } +} + impl std::fmt::Display for Map { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { let values = self diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 36ef3023f6cb..1043006b73bd 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -21,7 +21,7 @@ pub use etl::processor::Processor; pub use etl::transform::transformer::identity_pipeline; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; -pub use etl::{parse, Content, Pipeline}; +pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay}; pub use manager::{ error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, PipelineVersion, diff --git a/src/pipeline/tests/pipeline.rs b/src/pipeline/tests/pipeline.rs index d5712eaedd0b..c5b8bfc201da 100644 --- a/src/pipeline/tests/pipeline.rs +++ b/src/pipeline/tests/pipeline.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::value::ValueData; use api::v1::Rows; use common_telemetry::tracing::info; use greptime_proto::v1::value::ValueData::{ @@ -466,6 +467,57 @@ transform: } } +#[test] +fn test_json_type() { + let input_value_str = r#" +{ + "product_object": {"hello":"world"}, + "product_array": ["hello", "world"] +} +"#; + let input_value = serde_json::from_str::(input_value_str).unwrap(); + + let pipeline_yaml = r#" +processors: + +transform: + - fields: + - product_object + - product_array + type: json +"#; + + let yaml_content = Content::Yaml(pipeline_yaml.into()); + let pipeline: Pipeline = parse(&yaml_content).unwrap(); + + let mut status = pipeline.init_intermediate_state(); + pipeline.prepare(input_value, &mut status).unwrap(); + let row = pipeline.exec_mut(&mut status).unwrap(); + let r = row + .values + .into_iter() + .map(|v| v.value_data.unwrap()) + .collect::>(); + + let product_object = r[0].clone(); + let product_array = r[1].clone(); + match product_object { + ValueData::BinaryValue(data) => { + let jsonb = jsonb::from_slice(&data).unwrap().to_string(); + assert_eq!(r#"{"hello":"world"}"#, jsonb); + } + _ => panic!("unexpected value"), + } + + match product_array { + ValueData::BinaryValue(data) => { + let jsonb = jsonb::from_slice(&data).unwrap().to_string(); + assert_eq!(r#"["hello","world"]"#, jsonb); + } + _ => panic!("unexpected value"), + } +} + #[test] fn test_simple_data() { let input_value_str = r#" diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index df02a4485512..354bdf642f5c 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -14,6 +14,7 @@ testing = [] workspace = true [dependencies] +ahash = "0.8" aide = { version = "0.9", features = ["axum"] } api.workspace = true arrow.workspace = true diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index a796b895213c..9d7b2f9f8a03 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -531,6 +531,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("OpenTelemetry log error"))] + OpenTelemetryLog { + source: pipeline::etl_error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -595,7 +602,8 @@ impl ErrorExt for Error { | MysqlValueConversion { .. } | ParseJson { .. } | UnsupportedContentType { .. } - | TimestampOverflow { .. } => StatusCode::InvalidArguments, + | TimestampOverflow { .. } + | OpenTelemetryLog { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 67309b7244c5..b0c4843a1b85 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -36,6 +36,7 @@ use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::data_type::DataType; use datatypes::schema::SchemaRef; +use datatypes::value::transform_value_ref_to_json_value; use event::{LogState, LogValidatorRef}; use futures::FutureExt; use schemars::JsonSchema; @@ -241,14 +242,18 @@ impl HttpRecordsOutput { } else { let num_rows = recordbatches.iter().map(|r| r.num_rows()).sum::(); let mut rows = Vec::with_capacity(num_rows); + let schemas = schema.column_schemas(); let num_cols = schema.column_schemas().len(); rows.resize_with(num_rows, || Vec::with_capacity(num_cols)); let mut finished_row_cursor = 0; for recordbatch in recordbatches { - for col in recordbatch.columns() { + for (col_idx, col) in recordbatch.columns().iter().enumerate() { + // safety here: schemas length is equal to the number of columns in the recordbatch + let schema = &schemas[col_idx]; for row_idx in 0..recordbatch.num_rows() { - let value = Value::try_from(col.get_ref(row_idx)).context(ToJsonSnafu)?; + let value = transform_value_ref_to_json_value(col.get_ref(row_idx), schema) + .context(ToJsonSnafu)?; rows[row_idx + finished_row_cursor].push(value); } } @@ -882,6 +887,7 @@ impl HttpServer { Router::new() .route("/v1/metrics", routing::post(otlp::metrics)) .route("/v1/traces", routing::post(otlp::traces)) + .route("/v1/logs", routing::post(otlp::logs)) .layer( ServiceBuilder::new() .layer(HandleErrorLayer::new(handle_error)) diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index f9ff1485efd6..2d621db9e7aa 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -44,6 +44,9 @@ pub mod constants { pub const GREPTIME_DB_HEADER_NAME: &str = "x-greptime-db-name"; pub const GREPTIME_TIMEZONE_HEADER_NAME: &str = "x-greptime-timezone"; pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = common_error::GREPTIME_DB_HEADER_ERROR_CODE; + pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name"; + pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version"; + pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name"; } pub static GREPTIME_DB_HEADER_FORMAT: HeaderName = diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index 3efdaeec96d4..ddd318acc76c 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -12,26 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::str; +use std::result::Result as StdResult; use std::sync::Arc; -use axum::extract::State; -use axum::http::header; +use axum::extract::{FromRequestParts, State}; +use axum::http::header::HeaderValue; +use axum::http::request::Parts; +use axum::http::{header, StatusCode}; use axum::response::IntoResponse; -use axum::Extension; +use axum::{async_trait, Extension}; use bytes::Bytes; use common_telemetry::tracing; +use opentelemetry_proto::tonic::collector::logs::v1::{ + ExportLogsServiceRequest, ExportLogsServiceResponse, +}; use opentelemetry_proto::tonic::collector::metrics::v1::{ ExportMetricsServiceRequest, ExportMetricsServiceResponse, }; use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; +use pipeline::util::to_pipeline_version; +use pipeline::PipelineWay; use prost::Message; use session::context::{Channel, QueryContext}; use snafu::prelude::*; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, Result}; +use crate::http::header::constants::{ + GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, + GREPTIME_LOG_TABLE_NAME_HEADER_NAME, +}; use crate::query_handler::OpenTelemetryProtocolHandlerRef; #[axum_macros::debug_handler] @@ -39,8 +52,9 @@ use crate::query_handler::OpenTelemetryProtocolHandlerRef; pub async fn metrics( State(handler): State, Extension(mut query_ctx): Extension, + bytes: Bytes, -) -> Result { +) -> Result> { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); @@ -53,7 +67,7 @@ pub async fn metrics( handler .metrics(request, query_ctx) .await - .map(|o| OtlpMetricsResponse { + .map(|o| OtlpResponse { resp_body: ExportMetricsServiceResponse { partial_success: None, }, @@ -61,27 +75,13 @@ pub async fn metrics( }) } -pub struct OtlpMetricsResponse { - resp_body: ExportMetricsServiceResponse, - write_cost: usize, -} - -impl IntoResponse for OtlpMetricsResponse { - fn into_response(self) -> axum::response::Response { - let mut header_map = write_cost_header_map(self.write_cost); - header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); - - (header_map, self.resp_body.encode_to_vec()).into_response() - } -} - #[axum_macros::debug_handler] #[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] pub async fn traces( State(handler): State, Extension(mut query_ctx): Extension, bytes: Bytes, -) -> Result { +) -> Result> { let db = query_ctx.get_db_string(); query_ctx.set_channel(Channel::Otlp); let query_ctx = Arc::new(query_ctx); @@ -93,7 +93,7 @@ pub async fn traces( handler .traces(request, query_ctx) .await - .map(|o| OtlpTracesResponse { + .map(|o| OtlpResponse { resp_body: ExportTraceServiceResponse { partial_success: None, }, @@ -101,12 +101,143 @@ pub async fn traces( }) } -pub struct OtlpTracesResponse { - resp_body: ExportTraceServiceResponse, +pub struct PipelineInfo { + pub pipeline_name: Option, + pub pipeline_version: Option, +} + +fn pipeline_header_error( + header: &HeaderValue, + key: &str, +) -> StdResult { + let header_utf8 = str::from_utf8(header.as_bytes()); + match header_utf8 { + Ok(s) => Ok(s.to_string()), + Err(_) => Err(( + StatusCode::BAD_REQUEST, + format!("`{}` header is not valid UTF-8 string type.", key), + )), + } +} + +#[async_trait] +impl FromRequestParts for PipelineInfo +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let pipeline_name = parts.headers.get(GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME); + let pipeline_version = parts.headers.get(GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME); + match (pipeline_name, pipeline_version) { + (Some(name), Some(version)) => Ok(PipelineInfo { + pipeline_name: Some(pipeline_header_error( + name, + GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, + )?), + pipeline_version: Some(pipeline_header_error( + version, + GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, + )?), + }), + (None, _) => Ok(PipelineInfo { + pipeline_name: None, + pipeline_version: None, + }), + (Some(name), None) => Ok(PipelineInfo { + pipeline_name: Some(pipeline_header_error( + name, + GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME, + )?), + pipeline_version: None, + }), + } + } +} + +pub struct TableInfo { + table_name: String, +} + +#[async_trait] +impl FromRequestParts for TableInfo +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let table_name = parts.headers.get(GREPTIME_LOG_TABLE_NAME_HEADER_NAME); + + match table_name { + Some(name) => Ok(TableInfo { + table_name: pipeline_header_error(name, GREPTIME_LOG_TABLE_NAME_HEADER_NAME)?, + }), + None => Ok(TableInfo { + table_name: "opentelemetry_logs".to_string(), + }), + } + } +} + +#[axum_macros::debug_handler] +#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] +pub async fn logs( + State(handler): State, + Extension(mut query_ctx): Extension, + pipeline_info: PipelineInfo, + table_info: TableInfo, + bytes: Bytes, +) -> Result> { + let db = query_ctx.get_db_string(); + query_ctx.set_channel(Channel::Otlp); + let query_ctx = Arc::new(query_ctx); + let _timer = crate::metrics::METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED + .with_label_values(&[db.as_str()]) + .start_timer(); + let request = ExportLogsServiceRequest::decode(bytes).context(error::DecodeOtlpRequestSnafu)?; + + let pipeline_way; + if let Some(pipeline_name) = &pipeline_info.pipeline_name { + let pipeline_version = + to_pipeline_version(pipeline_info.pipeline_version).map_err(|_| { + error::InvalidParameterSnafu { + reason: GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME, + } + .build() + })?; + let pipeline = match handler + .get_pipeline(pipeline_name, pipeline_version, query_ctx.clone()) + .await + { + Ok(p) => p, + Err(e) => { + return Err(e); + } + }; + pipeline_way = PipelineWay::Custom(pipeline); + } else { + pipeline_way = PipelineWay::Identity; + } + + handler + .logs(request, pipeline_way, table_info.table_name, query_ctx) + .await + .map(|o| OtlpResponse { + resp_body: ExportLogsServiceResponse { + partial_success: None, + }, + write_cost: o.meta.cost, + }) +} + +pub struct OtlpResponse { + resp_body: T, write_cost: usize, } -impl IntoResponse for OtlpTracesResponse { +impl IntoResponse for OtlpResponse { fn into_response(self) -> axum::response::Response { let mut header_map = write_cost_header_map(self.write_cost); header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index cdf927536f04..ead86f3ad88b 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -141,6 +141,13 @@ lazy_static! { &[METRIC_DB_LABEL] ) .unwrap(); + pub static ref METRIC_HTTP_OPENTELEMETRY_LOGS_ELAPSED: HistogramVec = + register_histogram_vec!( + "greptime_servers_http_otlp_logs_elapsed", + "servers http otlp logs elapsed", + &[METRIC_DB_LABEL] + ) + .unwrap(); pub static ref METRIC_HTTP_LOGS_INGESTION_COUNTER: IntCounterVec = register_int_counter_vec!( "greptime_servers_http_logs_ingestion_counter", "servers http logs ingestion counter", diff --git a/src/servers/src/otlp.rs b/src/servers/src/otlp.rs index 5b92d12cb629..cc1321ac7020 100644 --- a/src/servers/src/otlp.rs +++ b/src/servers/src/otlp.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod logs; pub mod metrics; pub mod plugin; pub mod trace; diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs new file mode 100644 index 000000000000..4c47da29c873 --- /dev/null +++ b/src/servers/src/otlp/logs.rs @@ -0,0 +1,506 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, HashMap}; + +use api::v1::column_data_type_extension::TypeExt; +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnDataTypeExtension, ColumnOptions, ColumnSchema, JsonTypeExtension, Row, + RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value as GreptimeValue, +}; +use jsonb::{Number as JsonbNumber, Value as JsonbValue}; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; +use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}; +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use pipeline::{Array, Map, PipelineWay, Value as PipelineValue}; +use snafu::ResultExt; + +use super::trace::attributes::OtlpAnyValue; +use crate::error::{OpenTelemetryLogSnafu, Result}; +use crate::otlp::trace::span::bytes_to_hex_string; + +/// Convert OpenTelemetry metrics to GreptimeDB insert requests +/// +/// See +/// +/// for data structure of OTLP metrics. +/// +/// Returns `InsertRequests` and total number of rows to ingest +pub fn to_grpc_insert_requests( + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, +) -> Result<(RowInsertRequests, usize)> { + match pipeline { + PipelineWay::Identity => { + let rows = parse_export_logs_service_request_to_rows(request); + let len = rows.rows.len(); + let insert_request = RowInsertRequest { + rows: Some(rows), + table_name, + }; + Ok(( + RowInsertRequests { + inserts: vec![insert_request], + }, + len, + )) + } + PipelineWay::Custom(p) => { + let request = parse_export_logs_service_request(request); + let mut result = Vec::new(); + let mut intermediate_state = p.init_intermediate_state(); + for v in request { + p.prepare_pipeline_value(v, &mut intermediate_state) + .context(OpenTelemetryLogSnafu)?; + let r = p + .exec_mut(&mut intermediate_state) + .context(OpenTelemetryLogSnafu)?; + result.push(r); + } + let len = result.len(); + let rows = Rows { + schema: p.schemas().clone(), + rows: result, + }; + let insert_request = RowInsertRequest { + rows: Some(rows), + table_name, + }; + let insert_requests = RowInsertRequests { + inserts: vec![insert_request], + }; + Ok((insert_requests, len)) + } + } +} + +fn scope_to_pipeline_value( + scope: Option, +) -> (PipelineValue, PipelineValue, PipelineValue) { + scope + .map(|x| { + ( + PipelineValue::Map(Map { + values: key_value_to_map(x.attributes), + }), + PipelineValue::String(x.version), + PipelineValue::String(x.name), + ) + }) + .unwrap_or(( + PipelineValue::Null, + PipelineValue::Null, + PipelineValue::Null, + )) +} + +fn scope_to_jsonb( + scope: Option, +) -> (JsonbValue<'static>, Option, Option) { + scope + .map(|x| { + ( + key_value_to_jsonb(x.attributes), + Some(x.version), + Some(x.name), + ) + }) + .unwrap_or((JsonbValue::Null, None, None)) +} + +fn log_to_pipeline_value( + log: LogRecord, + resource_schema_url: PipelineValue, + resource_attr: PipelineValue, + scope_schema_url: PipelineValue, + scope_name: PipelineValue, + scope_version: PipelineValue, + scope_attrs: PipelineValue, +) -> PipelineValue { + let log_attrs = PipelineValue::Map(Map { + values: key_value_to_map(log.attributes), + }); + let mut map = BTreeMap::new(); + map.insert( + "Timestamp".to_string(), + PipelineValue::Uint64(log.time_unix_nano), + ); + map.insert( + "ObservedTimestamp".to_string(), + PipelineValue::Uint64(log.observed_time_unix_nano), + ); + + // need to be convert to string + map.insert( + "TraceId".to_string(), + PipelineValue::String(bytes_to_hex_string(&log.trace_id)), + ); + map.insert( + "SpanId".to_string(), + PipelineValue::String(bytes_to_hex_string(&log.span_id)), + ); + map.insert("TraceFlags".to_string(), PipelineValue::Uint32(log.flags)); + map.insert( + "SeverityText".to_string(), + PipelineValue::String(log.severity_text), + ); + map.insert( + "SeverityNumber".to_string(), + PipelineValue::Int32(log.severity_number), + ); + // need to be convert to string + map.insert( + "Body".to_string(), + log.body + .as_ref() + .map(|x| PipelineValue::String(log_body_to_string(x))) + .unwrap_or(PipelineValue::Null), + ); + map.insert("ResourceSchemaUrl".to_string(), resource_schema_url); + + map.insert("ResourceAttributes".to_string(), resource_attr); + map.insert("ScopeSchemaUrl".to_string(), scope_schema_url); + map.insert("ScopeName".to_string(), scope_name); + map.insert("ScopeVersion".to_string(), scope_version); + map.insert("ScopeAttributes".to_string(), scope_attrs); + map.insert("LogAttributes".to_string(), log_attrs); + PipelineValue::Map(Map { values: map }) +} + +fn build_otlp_logs_identity_schema() -> Vec { + [ + ( + "scope_name", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "scope_version", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "scope_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "resource_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "log_attributes", + ColumnDataType::Binary, + SemanticType::Field, + Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }), + None, + ), + ( + "timestamp", + ColumnDataType::TimestampNanosecond, + SemanticType::Timestamp, + None, + None, + ), + ( + "observed_timestamp", + ColumnDataType::TimestampNanosecond, + SemanticType::Field, + None, + None, + ), + ( + "trace_id", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "span_id", + ColumnDataType::String, + SemanticType::Tag, + None, + None, + ), + ( + "trace_flags", + ColumnDataType::Uint32, + SemanticType::Field, + None, + None, + ), + ( + "severity_text", + ColumnDataType::String, + SemanticType::Field, + None, + None, + ), + ( + "severity_number", + ColumnDataType::Int32, + SemanticType::Field, + None, + None, + ), + ( + "body", + ColumnDataType::String, + SemanticType::Field, + None, + Some(ColumnOptions { + options: HashMap::from([( + "fulltext".to_string(), + r#"{"enable":true}"#.to_string(), + )]), + }), + ), + ] + .into_iter() + .map( + |(field_name, column_type, semantic_type, datatype_extension, options)| ColumnSchema { + column_name: field_name.to_string(), + datatype: column_type as i32, + semantic_type: semantic_type as i32, + datatype_extension, + options, + }, + ) + .collect::>() +} + +fn build_identity_row( + log: LogRecord, + resource_attr: JsonbValue<'_>, + scope_name: Option, + scope_version: Option, + scope_attrs: JsonbValue<'_>, +) -> Row { + let row = vec![ + GreptimeValue { + value_data: scope_name.map(ValueData::StringValue), + }, + GreptimeValue { + value_data: scope_version.map(ValueData::StringValue), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(scope_attrs.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue(resource_attr.to_vec())), + }, + GreptimeValue { + value_data: Some(ValueData::BinaryValue( + key_value_to_jsonb(log.attributes).to_vec(), + )), + }, + GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue( + log.time_unix_nano as i64, + )), + }, + GreptimeValue { + value_data: Some(ValueData::TimestampNanosecondValue( + log.observed_time_unix_nano as i64, + )), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.trace_id))), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(bytes_to_hex_string(&log.span_id))), + }, + GreptimeValue { + value_data: Some(ValueData::U32Value(log.flags)), + }, + GreptimeValue { + value_data: Some(ValueData::StringValue(log.severity_text)), + }, + GreptimeValue { + value_data: Some(ValueData::I32Value(log.severity_number)), + }, + GreptimeValue { + value_data: log + .body + .as_ref() + .map(|x| ValueData::StringValue(log_body_to_string(x))), + }, + ]; + + Row { values: row } +} + +fn parse_export_logs_service_request_to_rows(request: ExportLogsServiceRequest) -> Rows { + let mut result = Vec::new(); + for r in request.resource_logs { + let resource_attr = r + .resource + .map(|x| key_value_to_jsonb(x.attributes)) + .unwrap_or(JsonbValue::Null); + for scope_logs in r.scope_logs { + let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); + for log in scope_logs.log_records { + let value = build_identity_row( + log, + resource_attr.clone(), + scope_name.clone(), + scope_version.clone(), + scope_attrs.clone(), + ); + result.push(value); + } + } + } + Rows { + schema: build_otlp_logs_identity_schema(), + rows: result, + } +} + +/// transform otlp logs request to pipeline value +/// https://opentelemetry.io/docs/concepts/signals/logs/ +fn parse_export_logs_service_request(request: ExportLogsServiceRequest) -> Vec { + let mut result = Vec::new(); + for r in request.resource_logs { + let resource_attr = r + .resource + .map(|x| { + PipelineValue::Map(Map { + values: key_value_to_map(x.attributes), + }) + }) + .unwrap_or(PipelineValue::Null); + let resource_schema_url = PipelineValue::String(r.schema_url); + for scope_logs in r.scope_logs { + let (scope_attrs, scope_version, scope_name) = + scope_to_pipeline_value(scope_logs.scope); + let scope_schema_url = PipelineValue::String(scope_logs.schema_url); + for log in scope_logs.log_records { + let value = log_to_pipeline_value( + log, + resource_schema_url.clone(), + resource_attr.clone(), + scope_schema_url.clone(), + scope_name.clone(), + scope_version.clone(), + scope_attrs.clone(), + ); + result.push(value); + } + } + } + result +} + +// convert AnyValue to pipeline value +fn any_value_to_pipeline_value(value: any_value::Value) -> PipelineValue { + match value { + any_value::Value::StringValue(s) => PipelineValue::String(s), + any_value::Value::IntValue(i) => PipelineValue::Int64(i), + any_value::Value::DoubleValue(d) => PipelineValue::Float64(d), + any_value::Value::BoolValue(b) => PipelineValue::Boolean(b), + any_value::Value::ArrayValue(a) => { + let values = a + .values + .into_iter() + .map(|v| match v.value { + Some(value) => any_value_to_pipeline_value(value), + None => PipelineValue::Null, + }) + .collect(); + PipelineValue::Array(Array { values }) + } + any_value::Value::KvlistValue(kv) => { + let value = key_value_to_map(kv.values); + PipelineValue::Map(Map { values: value }) + } + any_value::Value::BytesValue(b) => PipelineValue::String(bytes_to_hex_string(&b)), + } +} + +// convert otlp keyValue vec to map +fn key_value_to_map(key_values: Vec) -> BTreeMap { + let mut map = BTreeMap::new(); + for kv in key_values { + let value = match kv.value { + Some(value) => match value.value { + Some(value) => any_value_to_pipeline_value(value), + None => PipelineValue::Null, + }, + None => PipelineValue::Null, + }; + map.insert(kv.key.clone(), value); + } + map +} + +fn any_value_to_jsonb(value: any_value::Value) -> JsonbValue<'static> { + match value { + any_value::Value::StringValue(s) => JsonbValue::String(s.into()), + any_value::Value::IntValue(i) => JsonbValue::Number(JsonbNumber::Int64(i)), + any_value::Value::DoubleValue(d) => JsonbValue::Number(JsonbNumber::Float64(d)), + any_value::Value::BoolValue(b) => JsonbValue::Bool(b), + any_value::Value::ArrayValue(a) => { + let values = a + .values + .into_iter() + .map(|v| match v.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }) + .collect(); + JsonbValue::Array(values) + } + any_value::Value::KvlistValue(kv) => key_value_to_jsonb(kv.values), + any_value::Value::BytesValue(b) => JsonbValue::String(bytes_to_hex_string(&b).into()), + } +} + +fn key_value_to_jsonb(key_values: Vec) -> JsonbValue<'static> { + let mut map = BTreeMap::new(); + for kv in key_values { + let value = match kv.value { + Some(value) => match value.value { + Some(value) => any_value_to_jsonb(value), + None => JsonbValue::Null, + }, + None => JsonbValue::Null, + }; + map.insert(kv.key.clone(), value); + } + JsonbValue::Object(map) +} + +fn log_body_to_string(body: &AnyValue) -> String { + let otlp_value = OtlpAnyValue::from(body); + otlp_value.to_string() +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 1fe64e652265..a1ad9997ba7b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -33,9 +33,10 @@ use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; use headers::HeaderValue; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion}; +use pipeline::{GreptimeTransformer, Pipeline, PipelineInfo, PipelineVersion, PipelineWay}; use serde_json::Value; use session::context::QueryContextRef; @@ -105,7 +106,7 @@ pub trait PromStoreProtocolHandler { } #[async_trait] -pub trait OpenTelemetryProtocolHandler { +pub trait OpenTelemetryProtocolHandler: LogHandler { /// Handling opentelemetry metrics request async fn metrics( &self, @@ -119,6 +120,14 @@ pub trait OpenTelemetryProtocolHandler { request: ExportTraceServiceRequest, ctx: QueryContextRef, ) -> Result; + + async fn logs( + &self, + request: ExportLogsServiceRequest, + pipeline: PipelineWay, + table_name: String, + ctx: QueryContextRef, + ) -> Result; } /// LogHandler is responsible for handling log related requests. diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 7a030aad5696..8541adc37a71 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -17,10 +17,11 @@ use std::io::Write; use api::prom_store::remote::WriteRequest; use auth::user_provider_from_option; -use axum::http::{HeaderName, StatusCode}; +use axum::http::{HeaderName, HeaderValue, StatusCode}; use common_error::status_code::StatusCode as ErrorCode; use flate2::write::GzEncoder; use flate2::Compression; +use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use opentelemetry_proto::tonic::metrics::v1::ResourceMetrics; @@ -90,6 +91,7 @@ macro_rules! http_tests { test_otlp_metrics, test_otlp_traces, + test_otlp_logs, ); )* }; @@ -1520,7 +1522,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { let client = TestClient::new(app); // write metrics data - let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), false).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), false).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data @@ -1532,7 +1534,7 @@ pub async fn test_otlp_metrics(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, "/v1/otlp/v1/metrics", body.clone(), true).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/metrics", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1557,7 +1559,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { let client = TestClient::new(app); // write traces data - let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), false).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), false).await; assert_eq!(StatusCode::OK, res.status()); // select traces data @@ -1572,7 +1574,7 @@ pub async fn test_otlp_traces(store_type: StorageType) { assert_eq!(res.status(), StatusCode::OK); // write metrics data with gzip - let res = send_req(&client, "/v1/otlp/v1/traces", body.clone(), true).await; + let res = send_req(&client, vec![], "/v1/otlp/v1/traces", body.clone(), true).await; assert_eq!(StatusCode::OK, res.status()); // select metrics data again @@ -1581,6 +1583,40 @@ pub async fn test_otlp_traces(store_type: StorageType) { guard.remove_all().await; } +pub async fn test_otlp_logs(store_type: StorageType) { + common_telemetry::init_default_ut_logging(); + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; + + let content = r#" +{"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/resourceLogs","scopeLogs":[{"scope":{},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/scopeLogs","logRecords":[{"flags":1,"timeUnixNano":1581452773000009875,"observedTimeUnixNano":1581452773000009875,"severityNumber":9,"severityText":"Info","body":{"value":{"stringValue":"This is a log message"}},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":1}}],"droppedAttributesCount":1,"traceId":[48,56,48,52,48,50,48,49,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48],"spanId":[48,49,48,50,48,52,48,56,48,48,48,48,48,48,48,48]},{"flags":1,"timeUnixNano":1581452773000000789,"observedTimeUnixNano":1581452773000000789,"severityNumber":9,"severityText":"Info","body":{"value":{"stringValue":"something happened"}},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":[48],"spanId":[48]}]}]}]} +"#; + + let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); + let body = req.encode_to_vec(); + + // handshake + let client = TestClient::new(app); + + // write traces data + let res = send_req( + &client, + vec![( + HeaderName::from_static("x-greptime-log-table-name"), + HeaderValue::from_static("logs"), + )], + "/v1/otlp/v1/logs?db=public", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + + let expected = r#"[["","",{},{"resource-attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",{},{"resource-attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; + validate_data(&client, "select * from logs;", expected).await; + + guard.remove_all().await; +} + async fn validate_data(client: &TestClient, sql: &str, expected: &str) { let res = client .get(format!("/v1/sql?sql={sql}").as_str()) @@ -1593,11 +1629,21 @@ async fn validate_data(client: &TestClient, sql: &str, expected: &str) { assert_eq!(v, expected); } -async fn send_req(client: &TestClient, path: &str, body: Vec, with_gzip: bool) -> TestResponse { +async fn send_req( + client: &TestClient, + headers: Vec<(HeaderName, HeaderValue)>, + path: &str, + body: Vec, + with_gzip: bool, +) -> TestResponse { let mut req = client .post(path) .header("content-type", "application/x-protobuf"); + for (k, v) in headers { + req = req.header(k, v); + } + let mut len = body.len(); if with_gzip {