Skip to content

Commit

Permalink
Create guard for rebuild meta.
Browse files Browse the repository at this point in the history
The guard will cancel old rebuild process,
because the new one will have latest metastore configuration.
So, there is no point to continue the old process.
  • Loading branch information
iwanbk authored and LeeSmet committed Dec 23, 2024
1 parent 1ba9a17 commit aff44f4
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
5 changes: 5 additions & 0 deletions zstor/src/actors/backends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,11 @@ impl Handler<CheckBackends> for BackendManagerActor {
info!("Refreshing metadata cluster");
if let Err(err) = actor_addr.try_send(RefreshMeta {
backends,
// we don't to rebuild the metadata because:
// - If it is from writeable to not writeable, we can't rebuild anyway
// - If it is from not writeable to writeable, we don't need to rebuild
// because there was no write during not-writeable status
// - if it is new metadata backend, already rebuild by hot reload handler
rebuild_meta: false,
}) {
error!("Failed to send MyReplaceMeta message: {}", err);
Expand Down
35 changes: 35 additions & 0 deletions zstor/src/actors/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::{
zdb::ZdbConnectionInfo,
};
use actix::prelude::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::{path::PathBuf, sync::Arc};

#[derive(Message)]
Expand Down Expand Up @@ -134,6 +135,7 @@ pub struct ReplaceMetaStore {
pub struct MetaStoreActor {
meta_store: Arc<dyn MetaStore>,
writeable: bool,
rebuild_all_meta_counter: Arc<AtomicU64>,
}

impl MetaStoreActor {
Expand All @@ -143,6 +145,23 @@ impl MetaStoreActor {
Self {
meta_store: Arc::from(meta_store),
writeable,
rebuild_all_meta_counter: Arc::new(AtomicU64::new(0)),
}
}

/// create a guard for the rebuild meta operation.
/// This guard is used to check if a newer rebuild operation has started
/// and the current one should be stopped.
/// We stop the current one because the newer one will have the latest meta store configuration,
/// so there is no point to continue the current one.
fn create_rebuild_meta_guard(&self) -> RebuildAllMetaGuard {
let new_gen = self
.rebuild_all_meta_counter
.fetch_add(1, Ordering::Relaxed)
+ 1;
RebuildAllMetaGuard {
generation: new_gen,
current_gen: self.rebuild_all_meta_counter.clone(),
}
}
}
Expand Down Expand Up @@ -322,6 +341,16 @@ impl Handler<GetFailures> for MetaStoreActor {
}
}

struct RebuildAllMetaGuard {
generation: u64,
current_gen: Arc<AtomicU64>,
}

impl RebuildAllMetaGuard {
fn is_current(&self) -> bool {
self.generation == self.current_gen.load(Ordering::SeqCst)
}
}
/// Rebuild all meta data in the metastore:
/// - scan all keys in the metastore before current timestamp
/// - load meta by key
Expand All @@ -333,6 +362,8 @@ impl Handler<RebuildAllMeta> for MetaStoreActor {
let metastore = self.meta_store.clone();
let addr = ctx.address();
log::info!("Rebuilding all meta handler");
let rebuild_guard = self.create_rebuild_meta_guard();

Box::pin(async move {
let mut cursor = None;
let mut backend_idx = None;
Expand Down Expand Up @@ -361,6 +392,10 @@ impl Handler<RebuildAllMeta> for MetaStoreActor {
};

for key in keys {
if !rebuild_guard.is_current() {
log::info!("Newer rebuild started, stopping current one");
break;
}
log::info!("Rebuilding meta key: {}", key);
let meta: MetaData = match addr.send(LoadMetaByKey { key: key.clone() }).await {
Ok(Ok(m)) => m.unwrap(),
Expand Down

0 comments on commit aff44f4

Please sign in to comment.