Skip to content

Commit

Permalink
chore(server): Use hyper_util::server::graceful::GracefulShutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
tottoto committed Jul 24, 2024
1 parent 255a885 commit 1b8ce06
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 34 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ members = [
"tests/default_stubs",
]
resolver = "2"

[patch.crates-io]
hyper-util = { git = "https://github.com/hyperium/hyper-util.git", rev = "refs/pull/136/head" }
2 changes: 1 addition & 1 deletion tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ server = [
"dep:async-stream",
"dep:h2",
"dep:hyper", "hyper?/server",
"dep:hyper-util", "hyper-util?/service", "hyper-util?/server-auto",
"dep:hyper-util", "hyper-util?/service", "hyper-util?/server-auto", "hyper-util?/server-graceful",
"dep:socket2",
"dep:tokio", "tokio?/macros", "tokio?/net", "tokio?/time",
"tokio-stream/net",
Expand Down
48 changes: 15 additions & 33 deletions tonic/src/transport/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::service::Routes;
pub use conn::{Connected, TcpConnectInfo};
use hyper_util::{
rt::{TokioExecutor, TokioIo, TokioTimer},
server::graceful::GracefulShutdown,
service::TowerToHyperService,
};
#[cfg(feature = "tls")]
Expand Down Expand Up @@ -561,10 +562,7 @@ impl<L> Server<L> {
builder
};

let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
let signal_tx = Arc::new(signal_tx);

let graceful = signal.is_some();
let graceful = signal.is_some().then(GracefulShutdown::new);
let mut sig = pin!(Fuse { inner: signal });
let mut incoming = pin!(incoming);

Expand Down Expand Up @@ -601,21 +599,13 @@ impl<L> Server<L> {
let hyper_io = TokioIo::new(io);
let hyper_svc = TowerToHyperService::new(req_svc);

serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()));
serve_connection(hyper_io, hyper_svc, server.clone(), graceful.clone());
}
}
}

if graceful {
let _ = signal_tx.send(());
drop(signal_rx);
trace!(
"waiting for {} connections to close",
signal_tx.receiver_count()
);

// Wait for all connections to close
signal_tx.closed().await;
if let Some(graceful) = graceful {
graceful.shutdown().await;
}

Ok(())
Expand All @@ -628,7 +618,7 @@ fn serve_connection<IO, S>(
hyper_io: IO,
hyper_svc: S,
builder: ConnectionBuilder,
mut watcher: Option<tokio::sync::watch::Receiver<()>>,
graceful: Option<GracefulShutdown>,
) where
IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
S: HyperService<Request<Incoming>, Response = Response<BoxBody>> + Clone + Send + 'static,
Expand All @@ -637,28 +627,20 @@ fn serve_connection<IO, S>(
{
tokio::spawn(async move {
{
let mut sig = pin!(Fuse {
inner: watcher.as_mut().map(|w| w.changed()),
});
let conn = builder.serve_connection(hyper_io, hyper_svc);

let mut conn = pin!(builder.serve_connection(hyper_io, hyper_svc));
let result = if let Some(graceful) = graceful {
let conn = graceful.watch(conn);
conn.await
} else {
conn.await
};

loop {
tokio::select! {
rv = &mut conn => {
if let Err(err) = rv {
debug!("failed serving connection: {:#}", err);
}
break;
},
_ = &mut sig => {
conn.as_mut().graceful_shutdown();
}
}
if let Err(err) = result {
debug!("failed serving connection: {:#}", err);
}
}

drop(watcher);
trace!("connection closed");
});
}
Expand Down

0 comments on commit 1b8ce06

Please sign in to comment.