diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 1ac8fe029048..47bb940a1bb2 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -12,13 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::InsertRequests; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; use servers::error as server_error; use servers::error::AuthSnafu; use servers::opentsdb::codec::DataPoint; +use servers::opentsdb::data_point_to_grpc_row_insert_requests; use servers::query_handler::OpentsdbProtocolHandler; use session::context::QueryContextRef; use snafu::prelude::*; @@ -27,23 +27,27 @@ use crate::instance::Instance; #[async_trait] impl OpentsdbProtocolHandler for Instance { - async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> { + async fn exec( + &self, + data_points: Vec, + ctx: QueryContextRef, + ) -> server_error::Result { self.plugins .get::() .as_ref() .check_permission(ctx.current_user(), PermissionReq::Opentsdb) .context(AuthSnafu)?; - let requests = InsertRequests { - inserts: vec![data_point.as_grpc_insert()], - }; - let _ = self - .handle_inserts(requests, ctx) + let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?; + let output = self + .handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) - .with_context(|_| server_error::ExecuteQuerySnafu { - query: format!("{data_point:?}"), - })?; - Ok(()) + .context(servers::error::ExecuteGrpcQuerySnafu)?; + + Ok(match output { + common_query::Output::AffectedRows(rows) => rows, + _ => unreachable!(), + }) } } diff --git a/src/servers/src/http/opentsdb.rs b/src/servers/src/http/opentsdb.rs index c5b90b42a438..054595252ad3 100644 --- a/src/servers/src/http/opentsdb.rs +++ b/src/servers/src/http/opentsdb.rs @@ -84,17 +84,19 @@ pub async fn put( let summary = params.contains_key("summary"); let details = params.contains_key("details"); - let data_points = parse_data_points(body).await?; + let data_point_requests = parse_data_points(body).await?; + let data_points = data_point_requests + .iter() + .map(|point| point.clone().into()) + .collect::>(); let response = if !summary && !details { - for data_point in data_points.into_iter() { - if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await { - // Not debugging purpose, failed fast. - return error::InternalSnafu { - err_msg: e.to_string(), - } - .fail(); + if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await { + // Not debugging purpose, failed fast. + return error::InternalSnafu { + err_msg: e.to_string(), } + .fail(); } (HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty)) } else { @@ -108,15 +110,11 @@ pub async fn put( }, }; - for data_point in data_points.into_iter() { - let result = opentsdb_handler - .exec(&data_point.clone().into(), ctx.clone()) - .await; + for (data_point, request) in data_points.into_iter().zip(data_point_requests) { + let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await; match result { - Ok(()) => response.on_success(), - Err(e) => { - response.on_failed(data_point, e); - } + Ok(affected_rows) => response.on_success(affected_rows), + Err(e) => response.on_failed(request, e), } } ( @@ -151,8 +149,8 @@ pub struct OpentsdbDebuggingResponse { } impl OpentsdbDebuggingResponse { - fn on_success(&mut self) { - self.success += 1; + fn on_success(&mut self, affected_rows: usize) { + self.success += affected_rows as i32; } fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) { diff --git a/src/servers/src/opentsdb.rs b/src/servers/src/opentsdb.rs index 61ed84167064..07cde1e14765 100644 --- a/src/servers/src/opentsdb.rs +++ b/src/servers/src/opentsdb.rs @@ -20,16 +20,20 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::Arc; +use api::v1::RowInsertRequests; use async_trait::async_trait; use common_runtime::Runtime; use common_telemetry::logging::error; use futures::StreamExt; use tokio::sync::broadcast; +use self::codec::DataPoint; use crate::error::Result; use crate::opentsdb::connection::Connection; use crate::opentsdb::handler::Handler; +use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME}; use crate::query_handler::OpentsdbProtocolHandlerRef; +use crate::row_writer::{self, MultiTableData}; use crate::server::{AbortableStream, BaseTcpServer, Server}; use crate::shutdown::Shutdown; @@ -126,3 +130,38 @@ impl Server for OpentsdbServer { OPENTSDB_SERVER } } + +pub fn data_point_to_grpc_row_insert_requests( + data_points: Vec, +) -> Result<(RowInsertRequests, usize)> { + let mut multi_table_data = MultiTableData::new(); + + for mut data_point in data_points { + let tags: Vec<(String, String)> = std::mem::take(data_point.tags_mut()); + let table_name = data_point.metric(); + let value = data_point.value(); + let timestamp = data_point.ts_millis(); + // length of tags + 2 extra columns for greptime_timestamp and the value + let num_columns = tags.len() + 2; + + let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1); + let mut one_row = table_data.alloc_one_row(); + + // tags + row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?; + + // value + row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?; + // timestamp + row_writer::write_ts_millis( + table_data, + TIMESTAMP_COLUMN_NAME, + Some(timestamp), + &mut one_row, + )?; + + table_data.add_row(one_row); + } + + Ok(multi_table_data.into_row_insert_requests()) +} diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index 163e060adece..55e160460554 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -19,7 +19,7 @@ use crate::error::{self, Result}; pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value"; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DataPoint { metric: String, ts_millis: i64, @@ -115,6 +115,10 @@ impl DataPoint { &self.tags } + pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> { + &mut self.tags + } + pub fn ts_millis(&self) -> i64 { self.ts_millis } diff --git a/src/servers/src/opentsdb/handler.rs b/src/servers/src/opentsdb/handler.rs index 4cbe1731fe11..c260ccf1749f 100644 --- a/src/servers/src/opentsdb/handler.rs +++ b/src/servers/src/opentsdb/handler.rs @@ -94,7 +94,7 @@ impl Handler { match DataPoint::try_create(&line) { Ok(data_point) => { let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED); - let result = self.query_handler.exec(&data_point, ctx.clone()).await; + let result = self.query_handler.exec(vec![data_point], ctx.clone()).await; if let Err(e) = result { self.connection.write_line(e.output_msg()).await?; } @@ -128,8 +128,8 @@ mod tests { #[async_trait] impl OpentsdbProtocolHandler for DummyQueryHandler { - async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { - let metric = data_point.metric(); + async fn exec(&self, data_points: Vec, _ctx: QueryContextRef) -> Result { + let metric = data_points.first().unwrap().metric(); if metric == "should_failed" { return error::InternalSnafu { err_msg: "expected", @@ -137,7 +137,7 @@ mod tests { .fail(); } self.tx.send(metric.to_string()).await.unwrap(); - Ok(()) + Ok(data_points.len()) } } diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index ef8f74575e7c..4180c8469c8b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler { pub trait OpentsdbProtocolHandler { /// A successful request will not return a response. /// Only on error will the socket return a line of data. - async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>; + async fn exec(&self, data_points: Vec, ctx: QueryContextRef) -> Result; } pub struct PromStoreResponse { diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index e77143d3b3a1..552ef6ecb940 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -51,7 +51,8 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl OpentsdbProtocolHandler for DummyInstance { - async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { + async fn exec(&self, data_points: Vec, _ctx: QueryContextRef) -> Result { + let data_point = data_points.first().unwrap(); if data_point.metric() == "should_failed" { return error::InternalSnafu { err_msg: "expected", @@ -59,7 +60,7 @@ impl OpentsdbProtocolHandler for DummyInstance { .fail(); } let _ = self.tx.send(data_point.metric().to_string()).await; - Ok(()) + Ok(data_points.len()) } } @@ -172,10 +173,7 @@ async fn test_opentsdb_put() { while let Ok(s) = rx.try_recv() { metrics.push(s); } - assert_eq!( - metrics, - vec!["m1".to_string(), "m2".to_string(), "m3".to_string()] - ); + assert_eq!(metrics, vec!["m1".to_string(), "m2".to_string()]); } #[tokio::test] diff --git a/src/servers/tests/opentsdb.rs b/src/servers/tests/opentsdb.rs index 145fdc07dbe6..79ac2ba21939 100644 --- a/src/servers/tests/opentsdb.rs +++ b/src/servers/tests/opentsdb.rs @@ -37,8 +37,8 @@ struct DummyOpentsdbInstance { #[async_trait] impl OpentsdbProtocolHandler for DummyOpentsdbInstance { - async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> { - let metric = data_point.metric(); + async fn exec(&self, data_points: Vec, _ctx: QueryContextRef) -> Result { + let metric = data_points.first().unwrap().metric(); if metric == "should_failed" { return server_error::InternalSnafu { err_msg: "expected", @@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for DummyOpentsdbInstance { } let i = metric.parse::().unwrap(); let _ = self.tx.send(i * i).await; - Ok(()) + Ok(data_points.len()) } } diff --git a/tests-integration/src/opentsdb.rs b/tests-integration/src/opentsdb.rs index 5d6338d94270..c5474ea4e240 100644 --- a/tests-integration/src/opentsdb.rs +++ b/tests-integration/src/opentsdb.rs @@ -46,6 +46,8 @@ mod tests { async fn test_exec(instance: &Arc) { let ctx = QueryContext::arc(); + + // should create new table "my_metric_1" directly let data_point1 = DataPoint::new( "my_metric_1".to_string(), 1000, @@ -55,9 +57,8 @@ mod tests { ("tagk2".to_string(), "tagv2".to_string()), ], ); - // should create new table "my_metric_1" directly - instance.exec(&data_point1, ctx.clone()).await.unwrap(); + // should create new column "tagk3" directly let data_point2 = DataPoint::new( "my_metric_1".to_string(), 2000, @@ -67,12 +68,12 @@ mod tests { ("tagk3".to_string(), "tagv3".to_string()), ], ); - // should create new column "tagk3" directly - instance.exec(&data_point2, ctx.clone()).await.unwrap(); - let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); // should handle null tags properly - instance.exec(&data_point3, ctx.clone()).await.unwrap(); + let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]); + + let data_points = vec![data_point1, data_point2, data_point3]; + instance.exec(data_points, ctx.clone()).await.unwrap(); let output = instance .do_query( @@ -87,13 +88,13 @@ mod tests { let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let pretty_print = recordbatches.pretty_print().unwrap(); let expected = vec![ - "+---------------------+----------------+-------+-------+-------+", - "| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |", - "+---------------------+----------------+-------+-------+-------+", - "| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |", - "| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |", - "| 1970-01-01T00:00:03 | 3.0 | | | |", - "+---------------------+----------------+-------+-------+-------+", + "+-------+-------+----------------+---------------------+-------+", + "| tagk1 | tagk2 | greptime_value | greptime_timestamp | tagk3 |", + "+-------+-------+----------------+---------------------+-------+", + "| tagv1 | tagv2 | 1.0 | 1970-01-01T00:00:01 | |", + "| | tagv2 | 2.0 | 1970-01-01T00:00:02 | tagv3 |", + "| | | 3.0 | 1970-01-01T00:00:03 | |", + "+-------+-------+----------------+---------------------+-------+", ] .into_iter() .join("\n");