diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 7ed16097152f..a2245d8fb84a 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -114,7 +114,6 @@ impl OpenTelemetryProtocolHandler for Instance { .get::>(); interceptor_ref.pre_execute(ctx.clone())?; let (requests, rows) = otlp::logs::to_grpc_insert_requests(request, pipeline, table_name)?; - self.handle_log_inserts(requests, ctx) .await .inspect(|_| OTLP_LOGS_ROWS.inc_by(rows as u64)) diff --git a/src/pipeline/src/etl.rs b/src/pipeline/src/etl.rs index bec06cab8bfb..9bd47a899ec6 100644 --- a/src/pipeline/src/etl.rs +++ b/src/pipeline/src/etl.rs @@ -317,8 +317,33 @@ pub(crate) fn find_key_index(intermediate_keys: &[String], key: &str, kind: &str .context(IntermediateKeyIndexSnafu { kind, key }) } +/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs +#[derive(Default)] +pub struct SelectInfo { + pub keys: Vec, +} + +/// Try to convert a string to SelectInfo +/// The string should be a comma-separated list of keys +/// example: "key1,key2,key3" +/// The keys will be sorted and deduplicated +impl From for SelectInfo { + fn from(value: String) -> Self { + let mut keys: Vec = value.split(',').map(|s| s.to_string()).sorted().collect(); + keys.dedup(); + + SelectInfo { keys } + } +} + +impl SelectInfo { + pub fn is_empty(&self) -> bool { + self.keys.is_empty() + } +} + pub enum PipelineWay { - Identity, + OtlpLog(Box), Custom(std::sync::Arc>), } diff --git a/src/pipeline/src/etl/transform/transformer/greptime.rs b/src/pipeline/src/etl/transform/transformer/greptime.rs index 11d63c45032a..3b43696b5ab7 100644 --- a/src/pipeline/src/etl/transform/transformer/greptime.rs +++ b/src/pipeline/src/etl/transform/transformer/greptime.rs @@ -207,11 +207,11 @@ impl Transformer for GreptimeTransformer { /// As you traverse the user input JSON, this will change. /// It will record a superset of all user input schemas. #[derive(Debug, Default)] -struct SchemaInfo { +pub struct SchemaInfo { /// schema info - schema: Vec, + pub schema: Vec, /// index of the column name - index: HashMap, + pub index: HashMap, } fn resolve_schema( diff --git a/src/pipeline/src/lib.rs b/src/pipeline/src/lib.rs index 1043006b73bd..e9082de83ca1 100644 --- a/src/pipeline/src/lib.rs +++ b/src/pipeline/src/lib.rs @@ -18,10 +18,11 @@ mod metrics; pub use etl::error::Result; pub use etl::processor::Processor; +pub use etl::transform::transformer::greptime::SchemaInfo; pub use etl::transform::transformer::identity_pipeline; pub use etl::transform::{GreptimeTransformer, Transformer}; pub use etl::value::{Array, Map, Value}; -pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay}; +pub use etl::{error as etl_error, parse, Content, Pipeline, PipelineWay, SelectInfo}; pub use manager::{ error, pipeline_operator, table, util, PipelineInfo, PipelineRef, PipelineTableRef, PipelineVersion, diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 9d7b2f9f8a03..9e4ef0976bef 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -538,6 +538,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Unsupported json data type for tag: {} {}", key, ty))] + UnsupportedJsonDataTypeForTag { + key: String, + ty: String, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -603,7 +610,8 @@ impl ErrorExt for Error { | ParseJson { .. } | UnsupportedContentType { .. } | TimestampOverflow { .. } - | OpenTelemetryLog { .. } => StatusCode::InvalidArguments, + | OpenTelemetryLog { .. } + | UnsupportedJsonDataTypeForTag { .. } => StatusCode::InvalidArguments, Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index 2d621db9e7aa..f5b833f6739f 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -47,6 +47,7 @@ pub mod constants { pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name"; pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version"; pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name"; + pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys"; } pub static GREPTIME_DB_HEADER_FORMAT: HeaderName = diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index ddd318acc76c..51f2683e1b6b 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -34,11 +34,12 @@ use opentelemetry_proto::tonic::collector::trace::v1::{ ExportTraceServiceRequest, ExportTraceServiceResponse, }; use pipeline::util::to_pipeline_version; -use pipeline::PipelineWay; +use pipeline::{PipelineWay, SelectInfo}; use prost::Message; use session::context::{Channel, QueryContext}; use snafu::prelude::*; +use super::header::constants::GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME; use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, Result}; use crate::http::header::constants::{ @@ -181,13 +182,41 @@ where } } +pub struct SelectInfoWrapper(SelectInfo); + +#[async_trait] +impl FromRequestParts for SelectInfoWrapper +where + S: Send + Sync, +{ + type Rejection = (StatusCode, String); + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> StdResult { + let select = parts.headers.get(GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME); + + match select { + Some(name) => { + let select_header = + pipeline_header_error(name, GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME)?; + if select_header.is_empty() { + Ok(SelectInfoWrapper(Default::default())) + } else { + Ok(SelectInfoWrapper(SelectInfo::from(select_header))) + } + } + None => Ok(SelectInfoWrapper(Default::default())), + } + } +} + #[axum_macros::debug_handler] -#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "traces"))] +#[tracing::instrument(skip_all, fields(protocol = "otlp", request_type = "logs"))] pub async fn logs( State(handler): State, Extension(mut query_ctx): Extension, pipeline_info: PipelineInfo, table_info: TableInfo, + SelectInfoWrapper(select_info): SelectInfoWrapper, bytes: Bytes, ) -> Result> { let db = query_ctx.get_db_string(); @@ -218,7 +247,7 @@ pub async fn logs( }; pipeline_way = PipelineWay::Custom(pipeline); } else { - pipeline_way = PipelineWay::Identity; + pipeline_way = PipelineWay::OtlpLog(Box::new(select_info)); } handler diff --git a/src/servers/src/otlp/logs.rs b/src/servers/src/otlp/logs.rs index 4c47da29c873..5faaced461ef 100644 --- a/src/servers/src/otlp/logs.rs +++ b/src/servers/src/otlp/logs.rs @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap as StdHashMap}; +use std::mem; use api::v1::column_data_type_extension::TypeExt; use api::v1::value::ValueData; @@ -23,12 +24,14 @@ use api::v1::{ use jsonb::{Number as JsonbNumber, Value as JsonbValue}; use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::common::v1::{any_value, AnyValue, InstrumentationScope, KeyValue}; -use opentelemetry_proto::tonic::logs::v1::LogRecord; -use pipeline::{Array, Map, PipelineWay, Value as PipelineValue}; -use snafu::ResultExt; +use opentelemetry_proto::tonic::logs::v1::{LogRecord, ResourceLogs, ScopeLogs}; +use pipeline::{Array, Map, PipelineWay, SchemaInfo, SelectInfo, Value as PipelineValue}; +use snafu::{ensure, ResultExt}; use super::trace::attributes::OtlpAnyValue; -use crate::error::{OpenTelemetryLogSnafu, Result}; +use crate::error::{ + IncompatibleSchemaSnafu, OpenTelemetryLogSnafu, Result, UnsupportedJsonDataTypeForTagSnafu, +}; use crate::otlp::trace::span::bytes_to_hex_string; /// Convert OpenTelemetry metrics to GreptimeDB insert requests @@ -44,8 +47,8 @@ pub fn to_grpc_insert_requests( table_name: String, ) -> Result<(RowInsertRequests, usize)> { match pipeline { - PipelineWay::Identity => { - let rows = parse_export_logs_service_request_to_rows(request); + PipelineWay::OtlpLog(select_info) => { + let rows = parse_export_logs_service_request_to_rows(request, select_info)?; let len = rows.rows.len(); let insert_request = RowInsertRequest { rows: Some(rows), @@ -278,7 +281,7 @@ fn build_otlp_logs_identity_schema() -> Vec { SemanticType::Field, None, Some(ColumnOptions { - options: HashMap::from([( + options: StdHashMap::from([( "fulltext".to_string(), r#"{"enable":true}"#.to_string(), )]), @@ -298,13 +301,14 @@ fn build_otlp_logs_identity_schema() -> Vec { .collect::>() } -fn build_identity_row( +fn build_otlp_build_in_row( log: LogRecord, - resource_attr: JsonbValue<'_>, + resource_attr: JsonbValue<'static>, scope_name: Option, scope_version: Option, - scope_attrs: JsonbValue<'_>, + scope_attrs: JsonbValue<'static>, ) -> Row { + let log_attr = key_value_to_jsonb(log.attributes); let row = vec![ GreptimeValue { value_data: scope_name.map(ValueData::StringValue), @@ -319,9 +323,7 @@ fn build_identity_row( value_data: Some(ValueData::BinaryValue(resource_attr.to_vec())), }, GreptimeValue { - value_data: Some(ValueData::BinaryValue( - key_value_to_jsonb(log.attributes).to_vec(), - )), + value_data: Some(ValueData::BinaryValue(log_attr.to_vec())), }, GreptimeValue { value_data: Some(ValueData::TimestampNanosecondValue( @@ -355,35 +357,320 @@ fn build_identity_row( .map(|x| ValueData::StringValue(log_body_to_string(x))), }, ]; - Row { values: row } } -fn parse_export_logs_service_request_to_rows(request: ExportLogsServiceRequest) -> Rows { - let mut result = Vec::new(); - for r in request.resource_logs { +fn extract_field_from_attr_and_combine_schema( + schema_info: &mut SchemaInfo, + log_select: &SelectInfo, + jsonb: &jsonb::Value<'static>, +) -> Result> { + if log_select.keys.is_empty() { + return Ok(Vec::new()); + } + let mut append_value = Vec::with_capacity(schema_info.schema.len()); + for _ in schema_info.schema.iter() { + append_value.push(GreptimeValue { value_data: None }); + } + for k in &log_select.keys { + let index = schema_info.index.get(k).copied(); + if let Some(value) = jsonb.get_by_name_ignore_case(k).cloned() { + if let Some((schema, value)) = decide_column_schema(k, value)? { + if let Some(index) = index { + let column_schema = &schema_info.schema[index]; + ensure!( + column_schema.datatype == schema.datatype, + IncompatibleSchemaSnafu { + column_name: k.clone(), + datatype: column_schema.datatype().as_str_name(), + expected: column_schema.datatype, + actual: schema.datatype, + } + ); + append_value[index] = value; + } else { + let key = k.clone(); + schema_info.schema.push(schema); + schema_info.index.insert(key, schema_info.schema.len() - 1); + append_value.push(value); + } + } + } + } + Ok(append_value) +} + +fn decide_column_schema( + column_name: &str, + value: JsonbValue, +) -> Result> { + let column_info = match value { + JsonbValue::String(s) => Ok(Some(( + GreptimeValue { + value_data: Some(ValueData::StringValue(s.into())), + }, + ColumnDataType::String, + SemanticType::Tag, + None, + ))), + JsonbValue::Number(n) => match n { + JsonbNumber::Int64(i) => Ok(Some(( + GreptimeValue { + value_data: Some(ValueData::I64Value(i)), + }, + ColumnDataType::Int64, + SemanticType::Tag, + None, + ))), + JsonbNumber::Float64(_) => UnsupportedJsonDataTypeForTagSnafu { + ty: "FLOAT".to_string(), + key: column_name, + } + .fail(), + JsonbNumber::UInt64(u) => Ok(Some(( + GreptimeValue { + value_data: Some(ValueData::U64Value(u)), + }, + ColumnDataType::Uint64, + SemanticType::Tag, + None, + ))), + }, + JsonbValue::Bool(b) => Ok(Some(( + GreptimeValue { + value_data: Some(ValueData::BoolValue(b)), + }, + ColumnDataType::Boolean, + SemanticType::Tag, + None, + ))), + JsonbValue::Array(_) | JsonbValue::Object(_) => UnsupportedJsonDataTypeForTagSnafu { + ty: "Json".to_string(), + key: column_name, + } + .fail(), + JsonbValue::Null => Ok(None), + }; + column_info.map(|c| { + c.map(|(value, column_type, semantic_type, datatype_extension)| { + ( + ColumnSchema { + column_name: column_name.to_string(), + datatype: column_type as i32, + semantic_type: semantic_type as i32, + datatype_extension, + options: None, + }, + value, + ) + }) + }) +} + +#[derive(Debug, Clone, Copy)] +enum OpenTelemetryLogRecordAttrType { + Resource, + Scope, + Log, +} + +fn merge_schema( + input_schemas: Vec<(&SchemaInfo, OpenTelemetryLogRecordAttrType)>, +) -> BTreeMap<&String, (OpenTelemetryLogRecordAttrType, usize, &ColumnSchema)> { + let mut schemas = BTreeMap::new(); + input_schemas + .into_iter() + .for_each(|(schema_info, attr_type)| { + for (key, index) in schema_info.index.iter() { + if let Some(col_schema) = schema_info.schema.get(*index) { + schemas.insert(key, (attr_type, *index, col_schema)); + } + } + }); + schemas +} + +fn parse_export_logs_service_request_to_rows( + request: ExportLogsServiceRequest, + select_info: Box, +) -> Result { + let mut schemas = build_otlp_logs_identity_schema(); + let mut extra_resource_schema = SchemaInfo::default(); + let mut extra_scope_schema = SchemaInfo::default(); + let mut extra_log_schema = SchemaInfo::default(); + let parse_infos = parse_resource( + &select_info, + &mut extra_resource_schema, + &mut extra_scope_schema, + &mut extra_log_schema, + request.resource_logs, + )?; + + // order of schema is important + // resource < scope < log + // do not change the order + let final_extra_schema_info = merge_schema(vec![ + ( + &extra_resource_schema, + OpenTelemetryLogRecordAttrType::Resource, + ), + (&extra_scope_schema, OpenTelemetryLogRecordAttrType::Scope), + (&extra_log_schema, OpenTelemetryLogRecordAttrType::Log), + ]); + + let final_extra_schema = final_extra_schema_info + .iter() + .map(|(_, (_, _, v))| (*v).clone()) + .collect::>(); + + let extra_schema_len = final_extra_schema.len(); + schemas.extend(final_extra_schema); + + let mut results = Vec::with_capacity(parse_infos.len()); + for parse_info in parse_infos.into_iter() { + let mut row = parse_info.values; + let mut resource_values = parse_info.resource_extracted_values; + let mut scope_values = parse_info.scope_extracted_values; + let mut log_values = parse_info.log_extracted_values; + + let mut final_extra_values = vec![GreptimeValue { value_data: None }; extra_schema_len]; + for (idx, (_, (attr_type, index, _))) in final_extra_schema_info.iter().enumerate() { + let value = match attr_type { + OpenTelemetryLogRecordAttrType::Resource => resource_values.get_mut(*index), + OpenTelemetryLogRecordAttrType::Scope => scope_values.get_mut(*index), + OpenTelemetryLogRecordAttrType::Log => log_values.get_mut(*index), + }; + if let Some(value) = value { + // swap value to final_extra_values + mem::swap(&mut final_extra_values[idx], value); + } + } + + row.values.extend(final_extra_values); + results.push(row); + } + Ok(Rows { + schema: schemas, + rows: results, + }) +} + +fn parse_resource( + select_info: &SelectInfo, + extra_resource_schema: &mut SchemaInfo, + extra_scope_schema: &mut SchemaInfo, + extra_log_schema: &mut SchemaInfo, + resource_logs_vec: Vec, +) -> Result> { + let mut results = Vec::new(); + for r in resource_logs_vec { let resource_attr = r .resource - .map(|x| key_value_to_jsonb(x.attributes)) + .map(|resource| key_value_to_jsonb(resource.attributes)) .unwrap_or(JsonbValue::Null); - for scope_logs in r.scope_logs { - let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); - for log in scope_logs.log_records { - let value = build_identity_row( - log, - resource_attr.clone(), - scope_name.clone(), - scope_version.clone(), - scope_attrs.clone(), - ); - result.push(value); - } - } + let resource_extracted_values = extract_field_from_attr_and_combine_schema( + extra_resource_schema, + select_info, + &resource_attr, + )?; + let rows = parse_scope( + extra_scope_schema, + extra_log_schema, + select_info, + r.scope_logs, + resource_attr, + resource_extracted_values, + )?; + results.extend(rows); } - Rows { - schema: build_otlp_logs_identity_schema(), - rows: result, + Ok(results) +} + +struct ScopeInfo { + scope_name: Option, + scope_version: Option, + scope_attrs: JsonbValue<'static>, +} + +fn parse_scope( + extra_scope_schema: &mut SchemaInfo, + extra_log_schema: &mut SchemaInfo, + select_info: &SelectInfo, + scopes_log_vec: Vec, + resource_attr: JsonbValue<'static>, + resource_extracted_values: Vec, +) -> Result> { + let mut results = Vec::new(); + for scope_logs in scopes_log_vec { + let (scope_attrs, scope_version, scope_name) = scope_to_jsonb(scope_logs.scope); + let scope_extracted_values = extract_field_from_attr_and_combine_schema( + extra_scope_schema, + select_info, + &scope_attrs, + )?; + let rows = parse_log( + extra_log_schema, + select_info, + scope_logs.log_records, + &resource_attr, + ScopeInfo { + scope_name, + scope_version, + scope_attrs, + }, + &resource_extracted_values, + &scope_extracted_values, + )?; + results.extend(rows); } + Ok(results) +} + +fn parse_log( + extra_log_schema: &mut SchemaInfo, + select_info: &SelectInfo, + log_records: Vec, + resource_attr: &JsonbValue<'static>, + ScopeInfo { + scope_name, + scope_version, + scope_attrs, + }: ScopeInfo, + resource_extracted_values: &[GreptimeValue], + scope_extracted_values: &[GreptimeValue], +) -> Result> { + let mut result = Vec::with_capacity(log_records.len()); + + for log in log_records { + let log_attr = key_value_to_jsonb(log.attributes.clone()); + + let row = build_otlp_build_in_row( + log, + resource_attr.clone(), + scope_name.clone(), + scope_version.clone(), + scope_attrs.clone(), + ); + + let log_extracted_values = + extract_field_from_attr_and_combine_schema(extra_log_schema, select_info, &log_attr)?; + + let parse_info = ParseInfo { + values: row, + resource_extracted_values: resource_extracted_values.to_vec(), + scope_extracted_values: scope_extracted_values.to_vec(), + log_extracted_values, + }; + result.push(parse_info); + } + Ok(result) +} + +struct ParseInfo { + values: Row, + resource_extracted_values: Vec, + scope_extracted_values: Vec, + log_extracted_values: Vec, } /// transform otlp logs request to pipeline value diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8541adc37a71..131ba8363f61 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -1585,34 +1585,57 @@ pub async fn test_otlp_traces(store_type: StorageType) { pub async fn test_otlp_logs(store_type: StorageType) { common_telemetry::init_default_ut_logging(); - let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_traces").await; + let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "test_otlp_logs").await; + let client = TestClient::new(app); let content = r#" {"resourceLogs":[{"resource":{"attributes":[{"key":"resource-attr","value":{"stringValue":"resource-attr-val-1"}}]},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/resourceLogs","scopeLogs":[{"scope":{},"schemaUrl":"https://opentelemetry.io/schemas/1.0.0/scopeLogs","logRecords":[{"flags":1,"timeUnixNano":1581452773000009875,"observedTimeUnixNano":1581452773000009875,"severityNumber":9,"severityText":"Info","body":{"value":{"stringValue":"This is a log message"}},"attributes":[{"key":"app","value":{"stringValue":"server"}},{"key":"instance_num","value":{"intValue":1}}],"droppedAttributesCount":1,"traceId":[48,56,48,52,48,50,48,49,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48,48],"spanId":[48,49,48,50,48,52,48,56,48,48,48,48,48,48,48,48]},{"flags":1,"timeUnixNano":1581452773000000789,"observedTimeUnixNano":1581452773000000789,"severityNumber":9,"severityText":"Info","body":{"value":{"stringValue":"something happened"}},"attributes":[{"key":"customer","value":{"stringValue":"acme"}},{"key":"env","value":{"stringValue":"dev"}}],"droppedAttributesCount":1,"traceId":[48],"spanId":[48]}]}]}]} "#; let req: ExportLogsServiceRequest = serde_json::from_str(content).unwrap(); let body = req.encode_to_vec(); + { + // write log data + let res = send_req( + &client, + vec![ + ( + HeaderName::from_static("x-greptime-log-table-name"), + HeaderValue::from_static("logs"), + ), + ( + HeaderName::from_static("x-greptime-log-extract-keys"), + HeaderValue::from_static("resource-attr,instance_num,app,not-exist"), + ), + ], + "/v1/otlp/v1/logs?db=public", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); - // handshake - let client = TestClient::new(app); - - // write traces data - let res = send_req( - &client, - vec![( - HeaderName::from_static("x-greptime-log-table-name"), - HeaderValue::from_static("logs"), - )], - "/v1/otlp/v1/logs?db=public", - body.clone(), - false, - ) - .await; - assert_eq!(StatusCode::OK, res.status()); + let expected = r#"[["","",{},{"resource-attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened",null,null,"resource-attr-val-1"],["","",{},{"resource-attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message","server",1,"resource-attr-val-1"]]"#; + validate_data(&client, "select * from logs;", expected).await; + } - let expected = r#"[["","",{},{"resource-attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",{},{"resource-attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; - validate_data(&client, "select * from logs;", expected).await; + { + // write log data + let res = send_req( + &client, + vec![( + HeaderName::from_static("x-greptime-log-table-name"), + HeaderValue::from_static("logs1"), + )], + "/v1/otlp/v1/logs?db=public", + body.clone(), + false, + ) + .await; + assert_eq!(StatusCode::OK, res.status()); + let expected = r#"[["","",{},{"resource-attr":"resource-attr-val-1"},{"customer":"acme","env":"dev"},1581452773000000789,1581452773000000789,"30","30",1,"Info",9,"something happened"],["","",{},{"resource-attr":"resource-attr-val-1"},{"app":"server","instance_num":1},1581452773000009875,1581452773000009875,"3038303430323031303030303030303030303030303030303030303030303030","30313032303430383030303030303030",1,"Info",9,"This is a log message"]]"#; + validate_data(&client, "select * from logs1;", expected).await; + } guard.remove_all().await; }