Skip to content

Commit

Permalink
sweep objects singleton: prevent concurrent running handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
iwanbk committed Dec 18, 2024
1 parent 1532396 commit 540324c
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions zstor/src/actors/repairer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::actors::{
};
use actix::prelude::*;
use log::{debug, error, warn};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};

Expand All @@ -23,6 +25,7 @@ pub struct RepairActor {
meta: Addr<MetaStoreActor>,
backend_manager: Addr<BackendManagerActor>,
zstor: Addr<ZstorActor>,
handling_sweep_objects: Arc<AtomicBool>,
}

impl RepairActor {
Expand All @@ -37,6 +40,7 @@ impl RepairActor {
meta,
backend_manager,
zstor,
handling_sweep_objects: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -57,15 +61,36 @@ impl Actor for RepairActor {
}
}

struct SweepGuard {
flag: Arc<AtomicBool>,
}

impl Drop for SweepGuard {
fn drop(&mut self) {
self.flag.store(false, Ordering::Release);
}
}

impl Handler<SweepObjects> for RepairActor {
type Result = ResponseFuture<()>;

fn handle(&mut self, _: SweepObjects, _: &mut Self::Context) -> Self::Result {
if self.handling_sweep_objects.swap(true, Ordering::Acquire) {
log::info!("Dropping SweepObjects message - still processing");
return Box::pin(async {});
}

let meta = self.meta.clone();
let backend_manager = self.backend_manager.clone();
let zstor = self.zstor.clone();
let handling_sweep_objects = Arc::clone(&self.handling_sweep_objects);

log::info!("Starting SweepObjects");
Box::pin(async move {
let _guard = SweepGuard {
flag: Arc::clone(&handling_sweep_objects),
};

let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
Expand Down

0 comments on commit 540324c

Please sign in to comment.