Skip to content

Commit

Permalink
fix rmb calls timeout (#202)
Browse files Browse the repository at this point in the history
* feat: app kill on listener thread panic with backoff stratagy

Signed-off-by: nabil salah <[email protected]>

* fix: linting

Signed-off-by: nabil salah <[email protected]>

* fix: backoff reset at sucess

Signed-off-by: nabil salah <[email protected]>

---------

Signed-off-by: nabil salah <[email protected]>
  • Loading branch information
Nabil-Salah authored Dec 11, 2024
1 parent 11b1c05 commit 0ef17e7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 9 deletions.
62 changes: 54 additions & 8 deletions src/bins/rmb-relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rmb::relay::{
limiter::{FixedWindowOptions, Limiters},
};
use rmb::twin::SubstrateTwinDB;
use tokio::sync::oneshot;

/// A peer requires only which rely to connect to, and
/// which identity (mnemonics)
Expand Down Expand Up @@ -92,7 +93,7 @@ fn set_limits() -> Result<()> {
Ok(())
}

async fn app(args: Args) -> Result<()> {
async fn app(args: Args, tx: oneshot::Sender<()>) -> Result<()> {
if args.workers == 0 {
anyhow::bail!("number of workers cannot be zero");
}
Expand Down Expand Up @@ -172,10 +173,42 @@ async fn app(args: Args) -> Result<()> {

let mut l = events::Listener::new(args.substrate, redis_cache).await?;
tokio::spawn(async move {
l.listen()
.await
.context("failed to listen to chain events")
.unwrap();
let max_retries = 9; // max wait is 2^9 = 512 seconds ( 5 minutes )
let mut attempt = 0;
let mut backoff = Duration::from_secs(1);
let mut got_hit = false;

loop {
match l
.listen(&mut got_hit)
.await
.context("failed to listen to chain events")
{
Ok(_) => break,
Err(e) => {
if got_hit {
log::warn!("Listener got a hit, but failed to listen to chain events before no attempts will be reset");
got_hit = false;
attempt = 0;
backoff = Duration::from_secs(1);
}
attempt += 1;
if attempt > max_retries {
log::error!("Listener failed after {} attempts: {:?}", attempt - 1, e);
let _ = tx.send(());
break;
}
log::warn!(
"Listener failed on attempt {}: {:?}. Retrying in {:?}...",
attempt,
e,
backoff
);
tokio::time::sleep(backoff).await;
backoff *= 2;
}
}
}
});

r.start(&args.listen).await.unwrap();
Expand All @@ -185,8 +218,21 @@ async fn app(args: Args) -> Result<()> {
#[tokio::main]
async fn main() {
let args = Args::parse();
if let Err(e) = app(args).await {
eprintln!("{:#}", e);
std::process::exit(1);
let (tx, rx) = oneshot::channel();
let app_handle = tokio::spawn(async move {
if let Err(e) = app(args, tx).await {
eprintln!("{:#}", e);
std::process::exit(1);
}
});

tokio::select! {
_ = app_handle => {
log::info!("Application is closing successfully.");
}
_ = rx => {
log::error!("Listener shutdown signal received. Exiting application.");
std::process::exit(1);
}
}
}
4 changes: 3 additions & 1 deletion src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ where
anyhow::bail!("failed to connect to substrate using the provided urls")
}

pub async fn listen(&mut self) -> Result<()> {
pub async fn listen(&mut self, got_hit: &mut bool) -> Result<()> {
loop {
// always flush in case some blocks were finalized before reconnecting
if let Err(err) = self.cache.flush().await {
Expand All @@ -73,6 +73,8 @@ where
if let Some(subxt::Error::Rpc(_)) = err.downcast_ref::<subxt::Error>() {
self.api = Self::connect(&mut self.substrate_urls).await?;
}
} else {
*got_hit = true
}
}
}
Expand Down

0 comments on commit 0ef17e7

Please sign in to comment.