From 76f4ef3dc858925d2befd6c3fe59d8a7379db07d Mon Sep 17 00:00:00 2001 From: jeremyhi Date: Mon, 3 Jun 2024 12:21:05 +0800 Subject: [PATCH] feat: support cancellation --- Cargo.lock | 1 + src/servers/Cargo.toml | 1 + src/servers/src/grpc.rs | 1 + src/servers/src/grpc/cancellation.rs | 29 +++++++ src/servers/src/grpc/database.rs | 113 +++++++++++++++++--------- src/servers/src/grpc/region_server.rs | 25 ++++-- 6 files changed, 126 insertions(+), 44 deletions(-) create mode 100644 src/servers/src/grpc/cancellation.rs diff --git a/Cargo.lock b/Cargo.lock index 35fabee98b6c..a5ddac0ca772 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9644,6 +9644,7 @@ dependencies = [ "tokio-rustls 0.25.0", "tokio-stream", "tokio-test", + "tokio-util", "tonic 0.11.0", "tonic-reflection", "tower", diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 369476e6e5ba..b30426d2e7ac 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -98,6 +98,7 @@ table.workspace = true tokio.workspace = true tokio-rustls = "0.25" tokio-stream = { workspace = true, features = ["net"] } +tokio-util.workspace = true tonic.workspace = true tonic-reflection = "0.11" tower = { workspace = true, features = ["full"] } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index aaf64511bd63..eac2d874159c 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -14,6 +14,7 @@ mod authorize; pub mod builder; +mod cancellation; mod database; pub mod flight; pub mod greptime_handler; diff --git a/src/servers/src/grpc/cancellation.rs b/src/servers/src/grpc/cancellation.rs new file mode 100644 index 000000000000..f68f062b11c9 --- /dev/null +++ b/src/servers/src/grpc/cancellation.rs @@ -0,0 +1,29 @@ +use std::future::Future; + +use tokio::select; +use tokio_util::sync::CancellationToken; + +pub(crate) async fn with_cancellation_handler( + request_future: RequestF, + cancellation_future: CancellationF, +) -> Result, tonic::Status> +where + RequestF: Future, tonic::Status>> + Send + 'static, + CancellationF: + Future, tonic::Status>> + Send + 'static, + Response: Send + 'static, +{ + let token = CancellationToken::new(); + // Will call token.cancel() when the future is dropped, such as when the client cancels the request + let _drop_guard = token.clone().drop_guard(); + let select_task = tokio::spawn(async move { + // Can select on token cancellation on any cancellable future while handling the request, + // allowing for custom cleanup code or monitoring + select! { + res = request_future => res, + _ = token.cancelled() => cancellation_future.await, + } + }); + + select_task.await.unwrap() +} diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index 3e242fde1152..f8c9e298d4b5 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -18,11 +18,12 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader}; use async_trait::async_trait; use common_error::status_code::StatusCode; use common_query::OutputData; +use common_telemetry::warn; use futures::StreamExt; use tonic::{Request, Response, Status, Streaming}; use crate::grpc::greptime_handler::GreptimeRequestHandler; -use crate::grpc::TonicResult; +use crate::grpc::{cancellation, TonicResult}; pub(crate) struct DatabaseService { handler: GreptimeRequestHandler, @@ -40,55 +41,91 @@ impl GreptimeDatabase for DatabaseService { &self, request: Request, ) -> TonicResult> { - let request = request.into_inner(); - let output = self.handler.handle_request(request).await?; - let message = match output.data { - OutputData::AffectedRows(rows) => GreptimeResponse { - header: Some(ResponseHeader { - status: Some(api::v1::Status { - status_code: StatusCode::Success as _, - ..Default::default() + let remote_addr = request.remote_addr(); + let handler = self.handler.clone(); + let request_future = async move { + let request = request.into_inner(); + let output = handler.handle_request(request).await?; + let message = match output.data { + OutputData::AffectedRows(rows) => GreptimeResponse { + header: Some(ResponseHeader { + status: Some(api::v1::Status { + status_code: StatusCode::Success as _, + ..Default::default() + }), }), - }), - response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })), - }, - OutputData::Stream(_) | OutputData::RecordBatches(_) => { - return Err(Status::unimplemented("GreptimeDatabase::Handle for query")); - } + response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })), + }, + OutputData::Stream(_) | OutputData::RecordBatches(_) => { + return Err(Status::unimplemented("GreptimeDatabase::Handle for query")); + } + }; + + Ok(Response::new(message)) + }; + + let cancellation_future = async move { + warn!( + "GreptimeDatabase::Handle: request from {:?} cancelled by client", + remote_addr + ); + // If this future is executed it means the request future was dropped, + // so it doesn't actually matter what is returned here + Err(Status::cancelled( + "GreptimeDatabase::Handle: request cancelled by client", + )) }; - Ok(Response::new(message)) + cancellation::with_cancellation_handler(request_future, cancellation_future).await } async fn handle_requests( &self, request: Request>, ) -> Result, Status> { - let mut affected_rows = 0; + let remote_addr = request.remote_addr(); + let handler = self.handler.clone(); + let request_future = async move { + let mut affected_rows = 0; - let mut stream = request.into_inner(); - while let Some(request) = stream.next().await { - let request = request?; - let output = self.handler.handle_request(request).await?; - match output.data { - OutputData::AffectedRows(rows) => affected_rows += rows, - OutputData::Stream(_) | OutputData::RecordBatches(_) => { - return Err(Status::unimplemented( - "GreptimeDatabase::HandleRequests for query", - )); + let mut stream = request.into_inner(); + while let Some(request) = stream.next().await { + let request = request?; + let output = handler.handle_request(request).await?; + match output.data { + OutputData::AffectedRows(rows) => affected_rows += rows, + OutputData::Stream(_) | OutputData::RecordBatches(_) => { + return Err(Status::unimplemented( + "GreptimeDatabase::HandleRequests for query", + )); + } } } - } - let message = GreptimeResponse { - header: Some(ResponseHeader { - status: Some(api::v1::Status { - status_code: StatusCode::Success as _, - ..Default::default() + let message = GreptimeResponse { + header: Some(ResponseHeader { + status: Some(api::v1::Status { + status_code: StatusCode::Success as _, + ..Default::default() + }), }), - }), - response: Some(RawResponse::AffectedRows(AffectedRows { - value: affected_rows as u32, - })), + response: Some(RawResponse::AffectedRows(AffectedRows { + value: affected_rows as u32, + })), + }; + + Ok(Response::new(message)) + }; + + let cancellation_future = async move { + warn!( + "GreptimeDatabase::HandleRequests: request from {:?} cancelled by client", + remote_addr + ); + // If this future is executed it means the request future was dropped, + // so it doesn't actually matter what is returned here + Err(Status::cancelled( + "GreptimeDatabase::HandleRequests: request cancelled by client", + )) }; - Ok(Response::new(message)) + cancellation::with_cancellation_handler(request_future, cancellation_future).await } } diff --git a/src/servers/src/grpc/region_server.rs b/src/servers/src/grpc/region_server.rs index 30c6eb97cbe0..e25ee209f0f5 100644 --- a/src/servers/src/grpc/region_server.rs +++ b/src/servers/src/grpc/region_server.rs @@ -21,12 +21,12 @@ use common_error::ext::ErrorExt; use common_runtime::Runtime; use common_telemetry::tracing::info_span; use common_telemetry::tracing_context::{FutureExt, TracingContext}; -use common_telemetry::{debug, error}; +use common_telemetry::{debug, error, warn}; use snafu::{OptionExt, ResultExt}; -use tonic::{Request, Response}; +use tonic::{Request, Response, Status}; use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result}; -use crate::grpc::TonicResult; +use crate::grpc::{cancellation, TonicResult}; #[async_trait] pub trait RegionServerHandler: Send + Sync { @@ -94,8 +94,21 @@ impl RegionServer for RegionServerRequestHandler { &self, request: Request, ) -> TonicResult> { - let request = request.into_inner(); - let response = self.handle(request).await?; - Ok(Response::new(response)) + let remote_addr = request.remote_addr(); + let self_cloned = self.clone(); + let request_future = async move { + let request = request.into_inner(); + let response = self_cloned.handle(request).await?; + + Ok(Response::new(response)) + }; + + let cancellation_future = async move { + warn!("Region request from {:?} cancelled by client", remote_addr); + // If this future is executed it means the request future was dropped, + // so it doesn't actually matter what is returned here + Err(Status::cancelled("Region request cancelled by client")) + }; + cancellation::with_cancellation_handler(request_future, cancellation_future).await } }