From 1ba9a17624aab2248b79599b993494259d964a4a Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Thu, 19 Dec 2024 10:53:46 +0700 Subject: [PATCH] add retry mechanism on data loading when rebuild the data. We only retry on temporary errors like timeout. retry also only enabled on data rebuild, not retrieve, because latency is not crucial during data rebuild. --- zstor/src/actors/zstor.rs | 9 +++++---- zstor/src/zdb.rs | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/zstor/src/actors/zstor.rs b/zstor/src/actors/zstor.rs index bb5da36..2f1a17e 100644 --- a/zstor/src/actors/zstor.rs +++ b/zstor/src/actors/zstor.rs @@ -258,7 +258,7 @@ impl Handler for ZstorActor { ) })?; - let shards = load_data(&metadata).await?; + let shards = load_data(&metadata, 1).await?; pipeline .send(RecoverFile { @@ -335,7 +335,7 @@ impl Handler for ZstorActor { }; // load the data from the storage backends - let input = load_data(&old_metadata).await?; + let input = load_data(&old_metadata, 3).await?; let existing_data = input.clone(); // rebuild the data (in memory only) @@ -454,7 +454,8 @@ impl Handler for ZstorActor { } } -async fn load_data(metadata: &MetaData) -> ZstorResult>>> { +/// load data from the storage backends +async fn load_data(metadata: &MetaData, max_attempts: u64) -> ZstorResult>>> { // attempt to retrieve all shards let mut shard_loads: Vec)>> = Vec::with_capacity(metadata.shards().len()); @@ -468,7 +469,7 @@ async fn load_data(metadata: &MetaData) -> ZstorResult>>> { Ok(ok) => ok, Err(e) => return (idx, Err(e.into())), }; - match db.get(&key).await { + match db.get_with_retry(&key, max_attempts).await { Ok(potential_shard) => match potential_shard { Some(shard) => (idx, Ok((shard, chksum))), None => ( diff --git a/zstor/src/zdb.rs b/zstor/src/zdb.rs index d180a8a..6b64135 100644 --- a/zstor/src/zdb.rs +++ b/zstor/src/zdb.rs @@ -759,6 +759,39 @@ impl SequentialZdb { Ok(Some(data)) } + /// get data from the zdb with a retry mechanism. + /// The retry will only happen at temporary errors, + /// currently only timeouts. + pub async fn get_with_retry( + &self, + keys: &[Key], + max_attempts: u64, + ) -> ZdbResult>> { + if max_attempts < 2 { + return self.get(keys).await; + } + + let mut last_error = None; + + for attempt in 0..max_attempts { + match self.get(keys).await { + Ok(result) => return Ok(result), + Err(e) => { + if e.internal == ErrorCause::Timeout { + last_error = Some(e); + if attempt < max_attempts - 1 { + debug!("timeout error on attempt {}, retrying", attempt + 1); + } + continue; + } + return Err(e); + } + } + } + + Err(last_error.unwrap()) + } + /// Returns the [`ZdbConnectionInfo`] object used to connect to this db. #[inline] pub fn connection_info(&self) -> &ZdbConnectionInfo { @@ -1037,7 +1070,7 @@ impl ZdbError { } /// The cause of a zero db error. -#[derive(Debug)] +#[derive(Debug, PartialEq)] enum ErrorCause { Redis(redis::RedisError), Other(String),