diff --git a/tests-integration/src/database.rs b/tests-integration/src/database.rs index 82151ca5aea5..31254cefdcc2 100644 --- a/tests-integration/src/database.rs +++ b/tests-integration/src/database.rs @@ -18,9 +18,8 @@ use api::v1::greptime_database_client::GreptimeDatabaseClient; use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ - AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, DeleteRequests, DropTableExpr, - GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, RequestHeader, - RowInsertRequests, TruncateTableExpr, + AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, GreptimeRequest, InsertRequests, + QueryRequest, RequestHeader, }; use arrow_flight::Ticket; use async_stream::stream; @@ -38,8 +37,6 @@ use prost::Message; use snafu::{ensure, ResultExt}; use tonic::transport::Channel; -use crate::stream_insert::StreamInserter; - pub const DEFAULT_LOOKBACK_STRING: &str = "5m"; #[derive(Clone, Debug, Default)] @@ -104,34 +101,14 @@ impl Database { } } - pub fn catalog(&self) -> &String { - &self.catalog - } - pub fn set_catalog(&mut self, catalog: impl Into) { self.catalog = catalog.into(); } - pub fn schema(&self) -> &String { - &self.schema - } - pub fn set_schema(&mut self, schema: impl Into) { self.schema = schema.into(); } - pub fn dbname(&self) -> &String { - &self.dbname - } - - pub fn set_dbname(&mut self, dbname: impl Into) { - self.dbname = dbname.into(); - } - - pub fn timezone(&self) -> &String { - &self.timezone - } - pub fn set_timezone(&mut self, timezone: impl Into) { self.timezone = timezone.into(); } @@ -146,34 +123,6 @@ impl Database { self.handle(Request::Inserts(requests)).await } - pub async fn row_insert(&self, requests: RowInsertRequests) -> Result { - self.handle(Request::RowInserts(requests)).await - } - - pub fn streaming_inserter(&self) -> Result { - self.streaming_inserter_with_channel_size(65536) - } - - pub fn streaming_inserter_with_channel_size( - &self, - channel_size: usize, - ) -> Result { - let client = make_database_client(&self.client)?.inner; - - let stream_inserter = StreamInserter::new( - client, - self.dbname().to_string(), - self.ctx.auth_header.clone(), - channel_size, - ); - - Ok(stream_inserter) - } - - pub async fn delete(&self, request: DeleteRequests) -> Result { - self.handle(Request::Deletes(request)).await - } - async fn handle(&self, request: Request) -> Result { let mut client = make_database_client(&self.client)?.inner; let request = self.to_rpc_request(request); @@ -207,32 +156,6 @@ impl Database { .await } - pub async fn logical_plan(&self, logical_plan: Vec) -> Result { - self.do_get(Request::Query(QueryRequest { - query: Some(Query::LogicalPlan(logical_plan)), - })) - .await - } - - pub async fn prom_range_query( - &self, - promql: &str, - start: &str, - end: &str, - step: &str, - ) -> Result { - self.do_get(Request::Query(QueryRequest { - query: Some(Query::PromRangeQuery(PromRangeQuery { - query: promql.to_string(), - start: start.to_string(), - end: end.to_string(), - step: step.to_string(), - lookback: DEFAULT_LOOKBACK_STRING.to_string(), - })), - })) - .await - } - pub async fn create(&self, expr: CreateTableExpr) -> Result { self.do_get(Request::Ddl(DdlRequest { expr: Some(DdlExpr::CreateTable(expr)), @@ -247,20 +170,6 @@ impl Database { .await } - pub async fn drop_table(&self, expr: DropTableExpr) -> Result { - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::DropTable(expr)), - })) - .await - } - - pub async fn truncate_table(&self, expr: TruncateTableExpr) -> Result { - self.do_get(Request::Ddl(DdlRequest { - expr: Some(DdlExpr::TruncateTable(expr)), - })) - .await - } - async fn do_get(&self, request: Request) -> Result { let request = self.to_rpc_request(request); let request = Ticket { @@ -354,7 +263,7 @@ impl Database { } #[derive(Default, Debug, Clone)] -pub struct FlightContext { +struct FlightContext { auth_header: Option, } diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index 4dcfdcc09603..e4db599fd5e7 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -22,7 +22,6 @@ mod instance; mod opentsdb; mod otlp; mod prom_store; -mod stream_insert; pub mod test_util; pub mod standalone; diff --git a/tests-integration/src/stream_insert.rs b/tests-integration/src/stream_insert.rs deleted file mode 100644 index bd35c132c98a..000000000000 --- a/tests-integration/src/stream_insert.rs +++ /dev/null @@ -1,117 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use api::v1::greptime_database_client::GreptimeDatabaseClient; -use api::v1::greptime_request::Request; -use api::v1::{ - AuthHeader, GreptimeRequest, GreptimeResponse, InsertRequest, InsertRequests, RequestHeader, - RowInsertRequest, RowInsertRequests, -}; -use client::error::{self, Result}; -use client::from_grpc_response; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::ReceiverStream; -use tonic::transport::Channel; -use tonic::{Response, Status}; - -/// A structure that provides some methods for streaming data insert. -/// -/// [`StreamInserter`] cannot be constructed via the `StreamInserter::new` method. -/// You can use the following way to obtain [`StreamInserter`]. -/// -/// ```ignore -/// let grpc_client = Client::with_urls(vec!["127.0.0.1:4002"]); -/// let client = Database::new_with_dbname("db_name", grpc_client); -/// let stream_inserter = client.streaming_inserter().unwrap(); -/// ``` -/// -/// If you want to see a concrete usage example, please see -/// [stream_inserter.rs](https://github.com/GreptimeTeam/greptimedb/blob/main/src/client/examples/stream_ingest.rs). -pub struct StreamInserter { - sender: mpsc::Sender, - - auth_header: Option, - - dbname: String, - - join: JoinHandle, Status>>, -} - -impl StreamInserter { - pub(crate) fn new( - mut client: GreptimeDatabaseClient, - dbname: String, - auth_header: Option, - channel_size: usize, - ) -> StreamInserter { - let (send, recv) = tokio::sync::mpsc::channel(channel_size); - - let join: JoinHandle, Status>> = - tokio::spawn(async move { - let recv_stream = ReceiverStream::new(recv); - client.handle_requests(recv_stream).await - }); - - StreamInserter { - sender: send, - auth_header, - dbname, - join, - } - } - - pub async fn insert(&self, requests: Vec) -> Result<()> { - let inserts = InsertRequests { inserts: requests }; - let request = self.to_rpc_request(Request::Inserts(inserts)); - - self.sender.send(request).await.map_err(|e| { - error::ClientStreamingSnafu { - err_msg: e.to_string(), - } - .build() - }) - } - - pub async fn row_insert(&self, requests: Vec) -> Result<()> { - let inserts = RowInsertRequests { inserts: requests }; - let request = self.to_rpc_request(Request::RowInserts(inserts)); - - self.sender.send(request).await.map_err(|e| { - error::ClientStreamingSnafu { - err_msg: e.to_string(), - } - .build() - }) - } - - pub async fn finish(self) -> Result { - drop(self.sender); - - let response = self.join.await.unwrap()?; - let response = response.into_inner(); - from_grpc_response(response) - } - - fn to_rpc_request(&self, request: Request) -> GreptimeRequest { - GreptimeRequest { - header: Some(RequestHeader { - authorization: self.auth_header.clone(), - dbname: self.dbname.clone(), - ..Default::default() - }), - request: Some(request), - } - } -}