Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(launcher): more controlled shutdown #1129

Open
wants to merge 4 commits into
base: bragov4ik/launcher-shutdown-signals
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion libs/blockscout-service-launcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ actix-cors = { version = "0.6.4", optional = true }
actix-web-prom = { version = "0.6", optional = true }
anyhow = { version = "1.0", optional = true }
config = { version = "0.13", optional = true }
either = { version = "1.13", optional = true }
futures = { version = "0.3", optional = true }
cfg-if = { version = "1.0.0", optional = true }
keccak-hash = { version = "0.10.0", optional = true }
Expand All @@ -27,7 +28,7 @@ reqwest = { version = "0.11", features = ["json"], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = {version = "1", optional = true }
tokio = { version = "1", optional = true }
tokio-util = { version = "0.7.12", optional = true }
tokio-util = { version = "0.7.12", features = ["rt"], optional = true }
tonic = { version = "0.8", optional = true }
tracing = { version = "0.1", optional = true }
tracing-actix-web = { package = "blockscout-tracing-actix-web", version = "0.8.0", optional = true }
Expand Down Expand Up @@ -64,6 +65,7 @@ launcher = [
"dep:actix-cors",
"dep:anyhow",
"dep:config",
"dep:either",
"dep:futures",
"dep:once_cell",
"dep:prometheus",
Expand Down
145 changes: 118 additions & 27 deletions libs/blockscout-service-launcher/src/launcher/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use super::{
};
use actix_web::{middleware::Condition, App, HttpServer};
use actix_web_prom::PrometheusMetrics;
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use std::{future::Future, net::SocketAddr, time::Duration};
use tokio::{task::JoinSet, time::timeout};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing_actix_web::TracingLogger;

pub(crate) const SHUTDOWN_TIMEOUT_SEC: u64 = 10;
Expand All @@ -19,11 +20,71 @@ pub struct LaunchSettings {
pub metrics: MetricsSettings,
}

#[derive(Clone)]
pub(crate) struct TaskTrackers {
pub local: TaskTracker,
pub external: Option<TaskTracker>,
}

impl TaskTrackers {
pub fn new(external: Option<TaskTracker>) -> Self {
Self {
local: TaskTracker::new(),
external,
}
}

pub fn close(&self) {
self.local.close();
if let Some(t) = &self.external {
t.close();
}
}

/// Should be cancel-safe, just like `TaskTracker::wait()`
pub async fn wait(&self) {
self.local.wait().await;
if let Some(t) = &self.external {
t.wait().await;
}
}

pub fn track_future<F>(&self, future: F) -> impl Future<Output = F::Output>
where
F: Future,
{
let future = self.local.track_future(future);
if let Some(t) = &self.external {
either::Left(t.track_future(future))
} else {
either::Right(future)
}
}
}

async fn spawn_and_track<F>(
futures: &mut JoinSet<F::Output>,
trackers: &TaskTrackers,
future: F,
) -> tokio::task::AbortHandle
where
F: Future,
F: Send + 'static,
F::Output: Send,
{
if let Some(t) = &trackers.external {
futures.spawn(trackers.local.track_future(t.track_future(future)))
} else {
futures.spawn(trackers.local.track_future(future))
}
}

pub async fn launch<R>(
settings: &LaunchSettings,
http: R,
grpc: tonic::transport::server::Router,
shutdown: Option<CancellationToken>,
task_tracker: Option<TaskTracker>,
) -> Result<(), anyhow::Error>
where
R: HttpRouter + Send + Sync + Clone + 'static,
Expand All @@ -33,43 +94,68 @@ where
.enabled
.then(|| Metrics::new(&settings.service_name, &settings.metrics.route));

let mut futures = vec![];
let mut futures = JoinSet::new();
let trackers = TaskTrackers::new(task_tracker);

if settings.server.http.enabled {
let http_server = {
let http_server_future = http_serve(
http,
metrics
.as_ref()
.map(|metrics| metrics.http_middleware().clone()),
&settings.server.http,
shutdown.clone(),
);
tokio::spawn(async move { http_server_future.await.map_err(anyhow::Error::msg) })
};
futures.push(http_server)
let http_server = http_serve(
http,
metrics
.as_ref()
.map(|metrics| metrics.http_middleware().clone()),
&settings.server.http,
shutdown.clone(),
&trackers,
);
spawn_and_track(&mut futures, &trackers, async move {
http_server.await.map_err(anyhow::Error::msg)
})
.await;
}

if settings.server.grpc.enabled {
let grpc_server = {
let grpc_server_future = grpc_serve(grpc, settings.server.grpc.addr, shutdown.clone());
tokio::spawn(async move { grpc_server_future.await.map_err(anyhow::Error::msg) })
};
futures.push(grpc_server)
let grpc_server = grpc_serve(grpc, settings.server.grpc.addr, shutdown.clone());
spawn_and_track(&mut futures, &trackers, async move {
grpc_server.await.map_err(anyhow::Error::msg)
})
.await;
}

if let Some(metrics) = metrics {
let addr = settings.metrics.addr;
futures.push(tokio::spawn(async move {
metrics.run_server(addr, shutdown).await?;
let shutdown = shutdown.clone();
let trackers_ = trackers.clone();
spawn_and_track(&mut futures, &trackers, async move {
metrics.run_server(addr, shutdown, &trackers_).await?;
Ok(())
}));
})
.await;
}
if let Some(ref shutdown) = shutdown {
let shutdown = shutdown.clone();
spawn_and_track(&mut futures, &trackers, async move {
shutdown.cancelled().await;
Ok(())
})
.await;
}

let (res, _, others) = futures::future::select_all(futures).await;
for future in others.into_iter() {
future.abort()
let res = futures.join_next().await.expect("future set is not empty");
trackers.close();
if let Some(shutdown) = shutdown {
shutdown.cancel();
if timeout(Duration::from_secs(SHUTDOWN_TIMEOUT_SEC), trackers.wait())
.await
.is_err()
{
// timed out; fallback to simple task abort
futures.abort_all();
}
} else {
futures.abort_all();
}
trackers.wait().await;
futures.join_all().await;
res?
}

Expand All @@ -96,6 +182,7 @@ fn http_serve<R>(
metrics: Option<PrometheusMetrics>,
settings: &HttpServerSettings,
shutdown: Option<CancellationToken>,
task_trackers: &TaskTrackers,
) -> actix_web::dev::Server
where
R: HttpRouter + Send + Sync + Clone + 'static,
Expand Down Expand Up @@ -137,7 +224,11 @@ where
.run()
};
if let Some(shutdown) = shutdown {
tokio::spawn(stop_actix_server_on_cancel(server.handle(), shutdown, true));
tokio::spawn(task_trackers.track_future(stop_actix_server_on_cancel(
server.handle(),
shutdown,
true,
)));
}
server
}
Expand Down
9 changes: 8 additions & 1 deletion libs/blockscout-service-launcher/src/launcher/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use tokio_util::sync::CancellationToken;

use crate::launcher::launch::{stop_actix_server_on_cancel, SHUTDOWN_TIMEOUT_SEC};

use super::launch::TaskTrackers;

#[derive(Clone)]
pub struct Metrics {
metrics_middleware: PrometheusMetrics,
Expand Down Expand Up @@ -40,6 +42,7 @@ impl Metrics {
self,
addr: SocketAddr,
shutdown: Option<CancellationToken>,
task_trackers: &TaskTrackers,
) -> actix_web::dev::Server {
tracing::info!(addr = ?addr, "starting metrics server");
let server = HttpServer::new(move || App::new().wrap(self.metrics_middleware.clone()))
Expand All @@ -48,7 +51,11 @@ impl Metrics {
.unwrap()
.run();
if let Some(shutdown) = shutdown {
tokio::spawn(stop_actix_server_on_cancel(server.handle(), shutdown, true));
tokio::spawn(task_trackers.track_future(stop_actix_server_on_cancel(
server.handle(),
shutdown,
true,
)));
}
server
}
Expand Down
Loading