Skip to content

Commit

Permalink
feat: support cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Jun 3, 2024
1 parent 4e5dd1e commit 2d2d75d
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 44 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ table.workspace = true
tokio.workspace = true
tokio-rustls = "0.25"
tokio-stream = { workspace = true, features = ["net"] }
tokio-util.workspace = true
tonic.workspace = true
tonic-reflection = "0.11"
tower = { workspace = true, features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod authorize;
pub mod builder;
mod cancellation;
mod database;
pub mod flight;
pub mod greptime_handler;
Expand Down
43 changes: 43 additions & 0 deletions src/servers/src/grpc/cancellation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use tokio::select;
use tokio_util::sync::CancellationToken;

pub(crate) async fn with_cancellation_handler<RequestF, CancellationF, Response>(
request_future: RequestF,
cancellation_future: CancellationF,
) -> Result<tonic::Response<Response>, tonic::Status>
where
RequestF: Future<Output = Result<tonic::Response<Response>, tonic::Status>> + Send + 'static,
CancellationF:
Future<Output = Result<tonic::Response<Response>, tonic::Status>> + Send + 'static,
Response: Send + 'static,
{
let token = CancellationToken::new();
// Will call token.cancel() when the future is dropped, such as when the client cancels the request
let _drop_guard = token.clone().drop_guard();
let select_task = tokio::spawn(async move {
// Can select on token cancellation on any cancellable future while handling the request,
// allowing for custom cleanup code or monitoring
select! {
res = request_future => res,
_ = token.cancelled() => cancellation_future.await,
}
});

select_task.await.unwrap()
}
113 changes: 75 additions & 38 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::warn;
use futures::StreamExt;
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::TonicResult;
use crate::grpc::{cancellation, TonicResult};

pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
Expand All @@ -40,55 +41,91 @@ impl GreptimeDatabase for DatabaseService {
&self,
request: Request<GreptimeRequest>,
) -> TonicResult<Response<GreptimeResponse>> {
let request = request.into_inner();
let output = self.handler.handle_request(request).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
let remote_addr = request.remote_addr();
let handler = self.handler.clone();
let request_future = async move {
let request = request.into_inner();
let output = handler.handle_request(request).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
};

Ok(Response::new(message))
};

let cancellation_future = async move {
warn!(
"GreptimeDatabase::Handle: request from {:?} cancelled by client",
remote_addr
);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled(
"GreptimeDatabase::Handle: request cancelled by client",
))
};
Ok(Response::new(message))
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}

async fn handle_requests(
&self,
request: Request<Streaming<GreptimeRequest>>,
) -> Result<Response<GreptimeResponse>, Status> {
let mut affected_rows = 0;
let remote_addr = request.remote_addr();
let handler = self.handler.clone();
let request_future = async move {
let mut affected_rows = 0;

let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = self.handler.handle_request(request).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = handler.handle_request(request).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
}
}
}
}
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
};

Ok(Response::new(message))
};

let cancellation_future = async move {
warn!(
"GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
remote_addr
);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled(
"GreptimeDatabase::HandleRequests: request cancelled by client",
))
};
Ok(Response::new(message))
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}
}
25 changes: 19 additions & 6 deletions src/servers/src/grpc/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use common_error::ext::ErrorExt;
use common_runtime::Runtime;
use common_telemetry::tracing::info_span;
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, error};
use common_telemetry::{debug, error, warn};
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
use tonic::{Request, Response, Status};

use crate::error::{InvalidQuerySnafu, JoinTaskSnafu, Result};
use crate::grpc::TonicResult;
use crate::grpc::{cancellation, TonicResult};

#[async_trait]
pub trait RegionServerHandler: Send + Sync {
Expand Down Expand Up @@ -94,8 +94,21 @@ impl RegionServer for RegionServerRequestHandler {
&self,
request: Request<RegionRequest>,
) -> TonicResult<Response<RegionResponse>> {
let request = request.into_inner();
let response = self.handle(request).await?;
Ok(Response::new(response))
let remote_addr = request.remote_addr();
let self_cloned = self.clone();
let request_future = async move {
let request = request.into_inner();
let response = self_cloned.handle(request).await?;

Ok(Response::new(response))
};

let cancellation_future = async move {
warn!("Region request from {:?} cancelled by client", remote_addr);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled("Region request cancelled by client"))
};
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}
}

0 comments on commit 2d2d75d

Please sign in to comment.