Skip to content

Commit

Permalink
add retry mechanism on data loading when rebuild the data.
Browse files Browse the repository at this point in the history
We only retry on temporary errors like timeout.
retry also only enabled on data rebuild, not retrieve,
because latency is not crucial during data rebuild.
  • Loading branch information
iwanbk authored and LeeSmet committed Dec 23, 2024
1 parent 7101627 commit 1ba9a17
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
9 changes: 5 additions & 4 deletions zstor/src/actors/zstor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl Handler<Retrieve> for ZstorActor {
)
})?;

let shards = load_data(&metadata).await?;
let shards = load_data(&metadata, 1).await?;

pipeline
.send(RecoverFile {
Expand Down Expand Up @@ -335,7 +335,7 @@ impl Handler<Rebuild> for ZstorActor {
};

// load the data from the storage backends
let input = load_data(&old_metadata).await?;
let input = load_data(&old_metadata, 3).await?;
let existing_data = input.clone();

// rebuild the data (in memory only)
Expand Down Expand Up @@ -454,7 +454,8 @@ impl Handler<ReloadConfig> for ZstorActor {
}
}

async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
/// load data from the storage backends
async fn load_data(metadata: &MetaData, max_attempts: u64) -> ZstorResult<Vec<Option<Vec<u8>>>> {
// attempt to retrieve all shards
let mut shard_loads: Vec<JoinHandle<(usize, Result<(_, _), ZstorError>)>> =
Vec::with_capacity(metadata.shards().len());
Expand All @@ -468,7 +469,7 @@ async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
Ok(ok) => ok,
Err(e) => return (idx, Err(e.into())),
};
match db.get(&key).await {
match db.get_with_retry(&key, max_attempts).await {
Ok(potential_shard) => match potential_shard {
Some(shard) => (idx, Ok((shard, chksum))),
None => (
Expand Down
35 changes: 34 additions & 1 deletion zstor/src/zdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,39 @@ impl SequentialZdb {
Ok(Some(data))
}

/// get data from the zdb with a retry mechanism.
/// The retry will only happen at temporary errors,
/// currently only timeouts.
pub async fn get_with_retry(
&self,
keys: &[Key],
max_attempts: u64,
) -> ZdbResult<Option<Vec<u8>>> {
if max_attempts < 2 {
return self.get(keys).await;
}

let mut last_error = None;

for attempt in 0..max_attempts {
match self.get(keys).await {
Ok(result) => return Ok(result),
Err(e) => {
if e.internal == ErrorCause::Timeout {
last_error = Some(e);
if attempt < max_attempts - 1 {
debug!("timeout error on attempt {}, retrying", attempt + 1);
}
continue;
}
return Err(e);
}
}
}

Err(last_error.unwrap())
}

/// Returns the [`ZdbConnectionInfo`] object used to connect to this db.
#[inline]
pub fn connection_info(&self) -> &ZdbConnectionInfo {
Expand Down Expand Up @@ -1037,7 +1070,7 @@ impl ZdbError {
}

/// The cause of a zero db error.
#[derive(Debug)]
#[derive(Debug, PartialEq)]
enum ErrorCause {
Redis(redis::RedisError),
Other(String),
Expand Down

0 comments on commit 1ba9a17

Please sign in to comment.