Skip to content

Commit

Permalink
feat: Add functionality to the Opentelemetry write interface to extra…
Browse files Browse the repository at this point in the history
…ct fields from attr to top-level data. (#4859)

* chore: add otlp select

* chore: change otlp select

* chore: remove json path

* chore: format toml

* chore: change opentelemetry extract keys header name

* chore: add some doc and remove useless code and lib

* chore: make clippy happy

* chore: fix by pr comment

* chore: fix by pr comment

* chore: opentelemetry logs select key change some type default semantic type
  • Loading branch information
paomian authored Oct 24, 2024
1 parent 5d42e63 commit fcde0a4
Show file tree
Hide file tree
Showing 9 changed files with 437 additions and 64 deletions.
1 change: 0 additions & 1 deletion src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl OpenTelemetryProtocolHandler for Instance {
.get::<OpenTelemetryProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_execute(ctx.clone())?;
let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?;

self.handle_log_inserts(requests, ctx)
.await
.inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64))
Expand Down
27 changes: 26 additions & 1 deletion src/pipeline/src/etl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,33 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str
.context(IntermediateKeyIndexSnafu { kind, key })
}

/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
#[derive(Default)]
pub struct SelectInfo {
pub keys: Vec<String>,
}

/// Try to convert a string to SelectInfo
/// The string should be a comma-separated list of keys
/// example: "key1,key2,key3"
/// The keys will be sorted and deduplicated
impl From<String> for SelectInfo {
fn from(value: String) -> Self {
let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
keys.dedup();

SelectInfo { keys }
}
}

impl SelectInfo {
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}

pub enum PipelineWay {
Identity,
OtlpLog(Box<SelectInfo>),
Custom(std::sync::Arc<Pipeline<crate::GreptimeTransformer>>),
}

Expand Down
6 changes: 3 additions & 3 deletions src/pipeline/src/etl/transform/transformer/greptime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,11 @@ impl Transformer for GreptimeTransformer {
/// As you traverse the user input JSON, this will change.
/// It will record a superset of all user input schemas.
#[derive(Debug, Default)]
struct SchemaInfo {
pub struct SchemaInfo {
/// schema info
schema: Vec<ColumnSchema>,
pub schema: Vec<ColumnSchema>,
/// index of the column name
index: HashMap<String, usize>,
pub index: HashMap<String, usize>,
}

fn resolve_schema(
Expand Down
3 changes: 2 additions & 1 deletion src/pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ mod metrics;

pub use etl::error::Result;
pub use etl::processor::Processor;
pub use etl::transform::transformer::greptime::SchemaInfo;
pub use etl::transform::transformer::identity_pipeline;
pub use etl::transform::{GreptimeTransformer, Transformer};
pub use etl::value::{Array, Map, Value};
pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay};
pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay, SelectInfo};
pub use manager::{
error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef,
PipelineVersion,
Expand Down
10 changes: 9 additions & 1 deletion src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Unsupported json data type for tag: {} {}", key, ty))]
UnsupportedJsonDataTypeForTag {
key: String,
ty: String,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -603,7 +610,8 @@ impl ErrorExt for Error {
| ParseJson { .. }
| UnsupportedContentType { .. }
| TimestampOverflow { .. }
| OpenTelemetryLog { .. } => StatusCode::InvalidArguments,
| OpenTelemetryLog { .. }
| UnsupportedJsonDataTypeForTag { .. } => StatusCode::InvalidArguments,

Catalog { source, .. } => source.status_code(),
RowWriter { source, .. } => source.status_code(),
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/http/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod constants {
pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name";
pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
}

pub static GREPTIME_DB_HEADER_FORMAT: HeaderName =
Expand Down
35 changes: 32 additions & 3 deletions src/servers/src/http/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use pipeline::util::to_pipeline_version;
use pipeline::PipelineWay;
use pipeline::{PipelineWay, SelectInfo};
use prost::Message;
use session::context::{Channel, QueryContext};
use snafu::prelude::*;

use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME;
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
use crate::error::{self, Result};
use crate::http::header::constants::{
Expand Down Expand Up @@ -181,13 +182,41 @@ where
}
}

pub struct SelectInfoWrapper(SelectInfo);

#[async_trait]
impl<S> FromRequestParts<S> for SelectInfoWrapper
where
S: Send + Sync,
{
type Rejection = (StatusCode, String);

async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult<Self, Self::Rejection> {
let select = parts.headers.get(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME);

match select {
Some(name) => {
let select_header =
pipeline_header_error(name, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?;
if select_header.is_empty() {
Ok(SelectInfoWrapper(Default::default()))
} else {
Ok(SelectInfoWrapper(SelectInfo::from(select_header)))
}
}
None => Ok(SelectInfoWrapper(Default::default())),
}
}
}

#[axum_macros::debug_handler]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))]
#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))]
pub async fn logs(
State(handler): State<OpenTelemetryProtocolHandlerRef>,
Extension(mut query_ctx): Extension<QueryContext>,
pipeline_info: PipelineInfo,
table_info: TableInfo,
SelectInfoWrapper(select_info): SelectInfoWrapper,
bytes: Bytes,
) -> Result<OtlpResponse<ExportLogsServiceResponse>> {
let db = query_ctx.get_db_string();
Expand Down Expand Up @@ -218,7 +247,7 @@ pub async fn logs(
};
pipeline_way = PipelineWay::Custom(pipeline);
} else {
pipeline_way = PipelineWay::Identity;
pipeline_way = PipelineWay::OtlpLog(Box::new(select_info));
}

handler
Expand Down
Loading

0 comments on commit fcde0a4

Please sign in to comment.