Skip to content

Commit

Permalink
feat: add API to write OpenTelemetry logs to GreptimeDB (#4755)
Browse files Browse the repository at this point in the history
* chore: otlp logs api

* feat: add API to write OpenTelemetry logs to GreptimeDB

* chore: fix test data schema error

* chore: modify the underlying data structure of the pipeline value map type from hashmap to btremap to keep key order

* chore: fix by pr comment

* chore: resolve conflicts and add some test

* chore: remove useless error

* chore: change otlp header name

* chore: fmt code

* chore: fix integration test for otlp log write api

* chore: fix by pr comment

* chore: set otlp body with fulltext default
  • Loading branch information
paomian authored Oct 16, 2024
1 parent 59ec902 commit 4622412
Show file tree
Hide file tree
Showing 24 changed files with 1,158 additions and 130 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ opentelemetry-proto = { version = "0.5", features = [
"metrics",
"trace",
"with-serde",
"logs",
] }
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
Expand Down
78 changes: 46 additions & 32 deletions src/datatypes/src/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use snafu::{ensure, ResultExt};

use crate::error::{self, ConvertArrowArrayToScalarsSnafu, Error, Result, TryFromValueSnafu};
use crate::prelude::*;
use crate::schema::ColumnSchema;
use crate::type_id::LogicalTypeId;
use crate::types::{IntervalType, ListType};
use crate::vectors::ListVector;
Expand Down Expand Up @@ -1286,39 +1287,52 @@ impl<'a> From<Option<ListValueRef<'a>>> for ValueRef<'a> {
}
}

impl<'a> TryFrom<ValueRef<'a>> for serde_json::Value {
type Error = serde_json::Error;

fn try_from(value: ValueRef<'a>) -> serde_json::Result<serde_json::Value> {
let json_value = match value {
ValueRef::Null => serde_json::Value::Null,
ValueRef::Boolean(v) => serde_json::Value::Bool(v),
ValueRef::UInt8(v) => serde_json::Value::from(v),
ValueRef::UInt16(v) => serde_json::Value::from(v),
ValueRef::UInt32(v) => serde_json::Value::from(v),
ValueRef::UInt64(v) => serde_json::Value::from(v),
ValueRef::Int8(v) => serde_json::Value::from(v),
ValueRef::Int16(v) => serde_json::Value::from(v),
ValueRef::Int32(v) => serde_json::Value::from(v),
ValueRef::Int64(v) => serde_json::Value::from(v),
ValueRef::Float32(v) => serde_json::Value::from(v.0),
ValueRef::Float64(v) => serde_json::Value::from(v.0),
ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()),
ValueRef::Binary(bytes) => serde_json::to_value(bytes)?,
ValueRef::Date(v) => serde_json::Value::Number(v.val().into()),
ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()),
ValueRef::List(v) => serde_json::to_value(v)?,
ValueRef::Timestamp(v) => serde_json::to_value(v.value())?,
ValueRef::Time(v) => serde_json::to_value(v.value())?,
ValueRef::IntervalYearMonth(v) => serde_json::Value::from(v),
ValueRef::IntervalDayTime(v) => serde_json::Value::from(v),
ValueRef::IntervalMonthDayNano(v) => serde_json::Value::from(v),
ValueRef::Duration(v) => serde_json::to_value(v.value())?,
ValueRef::Decimal128(v) => serde_json::to_value(v.to_string())?,
};
/// transform a [ValueRef] to a [serde_json::Value].
/// The json type will be handled specially
pub fn transform_value_ref_to_json_value<'a>(
value: ValueRef<'a>,
schema: &'a ColumnSchema,
) -> serde_json::Result<serde_json::Value> {
let json_value = match value {
ValueRef::Null => serde_json::Value::Null,
ValueRef::Boolean(v) => serde_json::Value::Bool(v),
ValueRef::UInt8(v) => serde_json::Value::from(v),
ValueRef::UInt16(v) => serde_json::Value::from(v),
ValueRef::UInt32(v) => serde_json::Value::from(v),
ValueRef::UInt64(v) => serde_json::Value::from(v),
ValueRef::Int8(v) => serde_json::Value::from(v),
ValueRef::Int16(v) => serde_json::Value::from(v),
ValueRef::Int32(v) => serde_json::Value::from(v),
ValueRef::Int64(v) => serde_json::Value::from(v),
ValueRef::Float32(v) => serde_json::Value::from(v.0),
ValueRef::Float64(v) => serde_json::Value::from(v.0),
ValueRef::String(bytes) => serde_json::Value::String(bytes.to_string()),
ValueRef::Binary(bytes) => {
if let ConcreteDataType::Json(_) = schema.data_type {
match jsonb::from_slice(bytes) {
Ok(json) => json.into(),
Err(e) => {
error!(e; "Failed to parse jsonb");
serde_json::Value::Null
}
}
} else {
serde_json::to_value(bytes)?
}
}
ValueRef::Date(v) => serde_json::Value::Number(v.val().into()),
ValueRef::DateTime(v) => serde_json::Value::Number(v.val().into()),
ValueRef::List(v) => serde_json::to_value(v)?,
ValueRef::Timestamp(v) => serde_json::to_value(v.value())?,
ValueRef::Time(v) => serde_json::to_value(v.value())?,
ValueRef::IntervalYearMonth(v) => serde_json::Value::from(v),
ValueRef::IntervalDayTime(v) => serde_json::Value::from(v),
ValueRef::IntervalMonthDayNano(v) => serde_json::Value::from(v),
ValueRef::Duration(v) => serde_json::to_value(v.value())?,
ValueRef::Decimal128(v) => serde_json::to_value(v.to_string())?,
};

Ok(json_value)
}
Ok(json_value)
}

/// Reference to a [ListValue].
Expand Down
31 changes: 30 additions & 1 deletion src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use pipeline::PipelineWay;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
Expand All @@ -28,7 +30,7 @@ use session::context::QueryContextRef;
use snafu::ResultExt;

use crate::instance::Instance;
use crate::metrics::{OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};
use crate::metrics::{OTLP_LOGS_ROWS, OTLP_METRICS_ROWS, OTLP_TRACES_ROWS};

#[async_trait]
impl OpenTelemetryProtocolHandler for Instance {
Expand Down Expand Up @@ -92,4 +94,31 @@ impl OpenTelemetryProtocolHandler for Instance {
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}

#[tracing::instrument(skip_all)]
async fn logs(
&self,
request: ExportLogsServiceRequest,
pipeline: PipelineWay,
table_name: String,
ctx: QueryContextRef,
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Otlp)
.context(AuthSnafu)?;

let interceptor_ref = self
.plugins
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;

self.handle_log_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}
12 changes: 12 additions & 0 deletions src/frontend/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,28 @@ lazy_static! {
.with_label_values(&["insert"]);
pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED
.with_label_values(&["execute"]);

/// The number of OpenTelemetry metrics send by frontend node.
pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_metrics_rows",
"frontend otlp metrics rows"
)
.unwrap();

/// The number of OpenTelemetry traces send by frontend node.
pub static ref OTLP_TRACES_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_traces_rows",
"frontend otlp traces rows"
)
.unwrap();

/// The number of OpenTelemetry logs send by frontend node.
pub static ref OTLP_LOGS_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_logs_rows",
"frontend otlp logs rows"
)
.unwrap();

/// The number of heartbeats send by frontend node.
pub static ref HEARTBEAT_SENT_COUNT: IntCounter = register_int_counter!(
"greptime_frontend_heartbeat_send_count",
Expand Down
36 changes: 36 additions & 0 deletions src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,37 @@ where
self.transformer.transform_mut(val)
}

pub fn prepare_pipeline_value(&self, val: Value, result: &mut [Value]) -> Result<()> {
match val {
Value::Map(map) => {
let mut search_from = 0;
// because of the key in the json map is ordered
for (payload_key, payload_value) in map.values.into_iter() {
if search_from >= self.required_keys.len() {
break;
}

// because of map key is ordered, required_keys is ordered too
if let Some(pos) = self.required_keys[search_from..]
.iter()
.position(|k| k == &payload_key)
{
result[search_from + pos] = payload_value;
// next search from is always after the current key
search_from += pos;
}
}
}
Value::String(_) => {
result[0] = val;
}
_ => {
return PrepareValueMustBeObjectSnafu.fail();
}
}
Ok(())
}

pub fn prepare(&self, val: serde_json::Value, result: &mut [Value]) -> Result<()> {
match val {
serde_json::Value::Object(map) => {
Expand Down Expand Up @@ -286,6 +317,11 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}

pub enum PipelineWay {
Identity,
Custom(std::sync::Arc<Pipeline<crate::GreptimeTransformer>>),
}

#[cfg(test)]
mod tests {

Expand Down
16 changes: 12 additions & 4 deletions src/pipeline/src/etl/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,18 +438,26 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to coerce complex value, not supported"))]
CoerceComplexType {
#[snafu(display("Can not coerce json type to {ty}"))]
CoerceJsonTypeTo {
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Can not coerce {ty} to json type. we only consider object and array to be json types."
))]
CoerceTypeToJson {
ty: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("failed to coerce value: {msg}"))]
#[snafu(display("Failed to coerce value: {msg}"))]
CoerceIncompatibleTypes {
msg: String,
#[snafu(implicit)]
location: Location,
},

#[snafu(display(
"Invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
))]
Expand Down
7 changes: 4 additions & 3 deletions src/pipeline/src/etl/processor/cmcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ impl Processor for CmcdProcessor {

#[cfg(test)]
mod tests {
use ahash::HashMap;
use std::collections::BTreeMap;

use urlencoding::decode;

use super::{CmcdProcessorBuilder, CMCD_KEYS};
Expand Down Expand Up @@ -563,14 +564,14 @@ mod tests {
let values = vec
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect::<HashMap<String, Value>>();
.collect::<BTreeMap<String, Value>>();
let expected = Map { values };

let actual = processor.parse(0, &decoded).unwrap();
let actual = actual
.into_iter()
.map(|(index, value)| (intermediate_keys[index].clone(), value))
.collect::<HashMap<String, Value>>();
.collect::<BTreeMap<String, Value>>();
let actual = Map { values: actual };
assert_eq!(actual, expected);
}
Expand Down
6 changes: 4 additions & 2 deletions src/pipeline/src/etl/processor/regex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ impl Processor for RegexProcessor {
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use ahash::{HashMap, HashMapExt};
use itertools::Itertools;

Expand Down Expand Up @@ -475,14 +477,14 @@ ignore_missing: false"#;
.map(|k| k.to_string())
.collect_vec();
let processor = builder.build(&intermediate_keys).unwrap();
let mut result = HashMap::new();
let mut result = BTreeMap::new();
for (index, pattern) in processor.patterns.iter().enumerate() {
let r = processor
.process(&breadcrumbs_str, pattern, (0, index))
.unwrap()
.into_iter()
.map(|(k, v)| (intermediate_keys[k].clone(), v))
.collect::<HashMap<_, _>>();
.collect::<BTreeMap<_, _>>();
result.extend(r);
}
let map = Map { values: result };
Expand Down
Loading

0 comments on commit 4622412

Please sign in to comment.