Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support gRPC cancellation #4092

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,9 +454,11 @@ impl StartCommand {
);
let flownode = Arc::new(flow_builder.build().await);

let builder =
DatanodeBuilder::new(dn_opts, fe_plugins.clone()).with_kv_backend(kv_backend.clone());
let datanode = builder.build().await.context(StartDatanodeSnafu)?;
let datanode = DatanodeBuilder::new(dn_opts, fe_plugins.clone())
.with_kv_backend(kv_backend.clone())
.build()
.await
.context(StartDatanodeSnafu)?;

let node_manager = Arc::new(StandaloneDatanodeManager {
region_server: datanode.region_server(),
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;

use auth::UserProviderRef;
use common_base::Plugins;
use common_config::Configurable;
use common_config::{Configurable, Mode};
use common_runtime::Builder as RuntimeBuilder;
use servers::grpc::builder::GrpcServerBuilder;
use servers::grpc::greptime_handler::GreptimeRequestHandler;
Expand Down Expand Up @@ -140,11 +140,15 @@ where
};

let user_provider = self.plugins.get::<UserProviderRef>();
let runtime = match opts.mode {
Mode::Standalone => Some(builder.runtime().clone()),
_ => None,
};

let greptime_request_handler = GreptimeRequestHandler::new(
ServerGrpcQueryHandlerAdapter::arc(self.instance.clone()),
user_provider.clone(),
builder.runtime().clone(),
runtime,
);

let grpc_server = builder
Expand Down
1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ table.workspace = true
tokio.workspace = true
tokio-rustls = "0.25"
tokio-stream = { workspace = true, features = ["net"] }
tokio-util.workspace = true
tonic.workspace = true
tonic-reflection = "0.11"
tower = { workspace = true, features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions src/servers/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

mod authorize;
pub mod builder;
mod cancellation;
mod database;
pub mod flight;
pub mod greptime_handler;
Expand Down
90 changes: 90 additions & 0 deletions src/servers/src/grpc/cancellation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use tokio::select;
use tokio_util::sync::CancellationToken;

type Result<T> = std::result::Result<tonic::Response<T>, tonic::Status>;

pub(crate) async fn with_cancellation_handler<Request, Cancellation, Response>(
request: Request,
cancellation: Cancellation,
) -> Result<Response>
where
Request: Future<Output = Result<Response>> + Send + 'static,
Cancellation: Future<Output = Result<Response>> + Send + 'static,
Response: Send + 'static,
{
let token = CancellationToken::new();
// Will call token.cancel() when the future is dropped, such as when the client cancels the request
let _drop_guard = token.clone().drop_guard();
let select_task = tokio::spawn(async move {
// Can select on token cancellation on any cancellable future while handling the request,
// allowing for custom cleanup code or monitoring
select! {
res = request => res,
_ = token.cancelled() => cancellation.await,
}
waynexia marked this conversation as resolved.
Show resolved Hide resolved
});

select_task.await.unwrap()
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use tokio::sync::mpsc;
use tokio::time;
use tonic::Response;

use super::*;

#[tokio::test]
async fn test_request_completes_first() {
let request = async { Ok(Response::new("Request Completed")) };

let cancellation = async {
time::sleep(Duration::from_secs(1)).await;
Ok(Response::new("Cancelled"))
};

let result = with_cancellation_handler(request, cancellation).await;
assert_eq!(result.unwrap().into_inner(), "Request Completed");
}

#[tokio::test]
async fn test_cancellation_when_dropped() {
let (tx, mut rx) = mpsc::channel(2);
let tx_cloned = tx.clone();
let request = async move {
time::sleep(Duration::from_secs(1)).await;
tx_cloned.send("Request Completed").await.unwrap();
Ok(Response::new("Completed"))
};
let cancellation = async move {
tx.send("Request Cancelled").await.unwrap();
Ok(Response::new("Cancelled"))
};

let response_future = with_cancellation_handler(request, cancellation);
// It will drop the `response_future` and then call the `cancellation` future
let result = time::timeout(Duration::from_millis(50), response_future).await;

assert!(result.is_err(), "Expected timeout error");
assert_eq!("Request Cancelled", rx.recv().await.unwrap())
}
}
113 changes: 75 additions & 38 deletions src/servers/src/grpc/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ use api::v1::{AffectedRows, GreptimeRequest, GreptimeResponse, ResponseHeader};
use async_trait::async_trait;
use common_error::status_code::StatusCode;
use common_query::OutputData;
use common_telemetry::warn;
use futures::StreamExt;
use tonic::{Request, Response, Status, Streaming};

use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::TonicResult;
use crate::grpc::{cancellation, TonicResult};

pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
Expand All @@ -40,55 +41,91 @@ impl GreptimeDatabase for DatabaseService {
&self,
request: Request<GreptimeRequest>,
) -> TonicResult<Response<GreptimeResponse>> {
let request = request.into_inner();
let output = self.handler.handle_request(request).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
let remote_addr = request.remote_addr();
let handler = self.handler.clone();
let request_future = async move {
let request = request.into_inner();
let output = handler.handle_request(request).await?;
let message = match output.data {
OutputData::AffectedRows(rows) => GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
};

Ok(Response::new(message))
};

let cancellation_future = async move {
warn!(
"GreptimeDatabase::Handle: request from {:?} cancelled by client",
remote_addr
);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled(
"GreptimeDatabase::Handle: request cancelled by client",
))
};
Ok(Response::new(message))
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}

async fn handle_requests(
&self,
request: Request<Streaming<GreptimeRequest>>,
) -> Result<Response<GreptimeResponse>, Status> {
let mut affected_rows = 0;
let remote_addr = request.remote_addr();
let handler = self.handler.clone();
let request_future = async move {
let mut affected_rows = 0;

let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = self.handler.handle_request(request).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
let mut stream = request.into_inner();
while let Some(request) = stream.next().await {
let request = request?;
let output = handler.handle_request(request).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
OutputData::Stream(_) | OutputData::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));
}
}
}
}
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
let message = GreptimeResponse {
header: Some(ResponseHeader {
status: Some(api::v1::Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
}),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
response: Some(RawResponse::AffectedRows(AffectedRows {
value: affected_rows as u32,
})),
};

Ok(Response::new(message))
};

let cancellation_future = async move {
warn!(
"GreptimeDatabase::HandleRequests: request from {:?} cancelled by client",
remote_addr
);
// If this future is executed it means the request future was dropped,
// so it doesn't actually matter what is returned here
Err(Status::cancelled(
"GreptimeDatabase::HandleRequests: request cancelled by client",
))
};
Ok(Response::new(message))
cancellation::with_cancellation_handler(request_future, cancellation_future).await
}
}
43 changes: 26 additions & 17 deletions src/servers/src/grpc/greptime_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
pub struct GreptimeRequestHandler {
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
runtime: Option<Arc<Runtime>>,
}

impl GreptimeRequestHandler {
pub fn new(
handler: ServerGrpcQueryHandlerRef,
user_provider: Option<UserProviderRef>,
runtime: Arc<Runtime>,
runtime: Option<Arc<Runtime>>,
) -> Self {
Self {
handler,
Expand All @@ -73,16 +73,9 @@ impl GreptimeRequestHandler {
let request_type = request_type(&query).to_string();
let db = query_ctx.get_db_string();
let timer = RequestTimer::new(db.clone(), request_type);

// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
// - Refer to our blog for the rational behind it:
// https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
let tracing_context = TracingContext::from_current_span();
let handle = self.runtime.spawn(async move {

let result_future = async move {
handler
.do_query(query, query_ctx)
.trace(tracing_context.attach(tracing::info_span!(
Expand All @@ -98,12 +91,28 @@ impl GreptimeRequestHandler {
}
e
})
});

handle.await.context(JoinTaskSnafu).map_err(|e| {
timer.record(e.status_code());
e
})?
};

match &self.runtime {
Some(runtime) => {
// Executes requests in another runtime to
// 1. prevent the execution from being cancelled unexpected by Tonic runtime;
// - Refer to our blog for the rational behind it:
// https://www.greptime.com/blogs/2023-01-12-hidden-control-flow.html
// - Obtaining a `JoinHandle` to get the panic message (if there's any).
// From its docs, `JoinHandle` is cancel safe. The task keeps running even it's handle been dropped.
// 2. avoid the handler blocks the gRPC runtime incidentally.
waynexia marked this conversation as resolved.
Show resolved Hide resolved
runtime
.spawn(result_future)
.await
.context(JoinTaskSnafu)
.map_err(|e| {
timer.record(e.status_code());
e
})?
}
None => result_future.await,
}
}
}

Expand Down
Loading