From 10a663d3389398340ed686e23b559821913759c4 Mon Sep 17 00:00:00 2001 From: Iwan BK Date: Fri, 6 Dec 2024 18:12:00 +0700 Subject: [PATCH] improve sweep object to use scan. - Previous object_metas return _all_ keys which is not scalable. change it to scan for scalability - fix scan to stop after found data with timestamp > max_timestamp --- zstor/src/actors/meta.rs | 60 +++++++++++++++++++++- zstor/src/actors/repairer.rs | 99 +++++++++++++++++++++++------------- zstor/src/meta.rs | 5 +- zstor/src/zdb.rs | 10 ++-- zstor/src/zdb_meta.rs | 12 ++--- 5 files changed, 135 insertions(+), 51 deletions(-) diff --git a/zstor/src/actors/meta.rs b/zstor/src/actors/meta.rs index ba48610..26e90a6 100644 --- a/zstor/src/actors/meta.rs +++ b/zstor/src/actors/meta.rs @@ -57,6 +57,21 @@ pub struct IsReplaced { pub ci: ZdbConnectionInfo, } +#[derive(Message)] +#[rtype(result = "Result<(usize, Option>, Vec<(String, MetaData)>), MetaStoreError>")] +/// Message for scan [`MetaData`] objects in a [`MetaStore`] managed by a [`MetaStoreActor`]. +pub struct ScanMeta { + /// Cursor to start scanning from. + pub cursor: Option>, + + /// Backend index to scan from. + /// If none, it will use the backend with most keys + pub backend_idx: Option, + + /// Maximum timestamp to scan until. + pub max_timestamp: Option, +} + #[derive(Message)] #[rtype(result = "Result, MetaStoreError>")] /// Message for retrieving all [`MetaData`] objects in a [`MetaStore`] managed by a [`MetaStoreActor`]. @@ -208,6 +223,46 @@ impl Handler for MetaStoreActor { } } +impl Handler for MetaStoreActor { + type Result = + ResponseFuture>, Vec<(String, MetaData)>), MetaStoreError>>; + + fn handle(&mut self, msg: ScanMeta, ctx: &mut Self::Context) -> Self::Result { + let meta_store = self.meta_store.clone(); + let addr = ctx.address(); + + Box::pin(async move { + let (new_cursor, backend_idx, keys) = match meta_store + .scan_meta_keys(msg.cursor, msg.backend_idx, msg.max_timestamp) + .await + { + Ok(res) => res, + Err(e) => { + return Err(e); + } + }; + + let mut metas = Vec::with_capacity(keys.len()); + + for key in keys { + let meta: MetaData = match addr.send(LoadMetaByKey { key: key.clone() }).await { + Ok(Ok(m)) => m.unwrap(), + Ok(Err(e)) => { + log::error!("Error loading meta by key:{} - {}", key, e); + continue; + } + Err(e) => { + log::error!("Error loading meta by key:{} - {}", key, e); + continue; + } + }; + metas.push((key, meta)); + } + Ok((new_cursor, backend_idx, metas)) + }) + } +} + impl Handler for MetaStoreActor { type Result = ResponseFuture, MetaStoreError>>; @@ -336,7 +391,10 @@ impl Handler for MetaStoreActor { } } } - cursor = Some(new_cursor); + if cursor.is_none() { + break; + } + cursor = new_cursor; backend_idx = Some(idx); } }) diff --git a/zstor/src/actors/repairer.rs b/zstor/src/actors/repairer.rs index 0957a12..116e3d4 100644 --- a/zstor/src/actors/repairer.rs +++ b/zstor/src/actors/repairer.rs @@ -1,11 +1,12 @@ use crate::actors::{ backends::{BackendManagerActor, RequestBackends, StateInterest}, - meta::{MetaStoreActor, ObjectMetas}, + meta::{MetaStoreActor, ScanMeta}, zstor::{Rebuild, ZstorActor}, }; use actix::prelude::*; -use log::{error, warn}; +use log::{debug, error, warn}; use std::time::Duration; +use std::time::{SystemTime, UNIX_EPOCH}; /// Amount of time between starting a new sweep of the backend objects. const OBJECT_SWEEP_INTERVAL_SECONDS: u64 = 60 * 10; @@ -65,52 +66,80 @@ impl Handler for RepairActor { let zstor = self.zstor.clone(); Box::pin(async move { - let obj_metas = match meta.send(ObjectMetas).await { - Err(e) => { - error!("Could not request object metas from metastore: {}", e); - return; - } - Ok(om) => match om { - Err(e) => { - error!("Could not get object metas from metastore: {}", e); - return; - } - Ok(om) => om, - }, - }; + let start_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(); - for (key, metadata) in obj_metas.into_iter() { - let backend_requests = metadata - .shards() - .iter() - .map(|shard_info| shard_info.zdb()) - .cloned() - .collect::>(); - let backends = match backend_manager - .send(RequestBackends { - backend_requests, - interest: StateInterest::Readable, + // start scanning from the beginning (cursor == None) and let the metastore choose the backend_id + let mut cursor = None; + let mut backend_idx = None; + loop { + // scan keys from the metastore + let (idx, new_cursor, metas) = match meta + .send(ScanMeta { + cursor: cursor.clone(), + backend_idx, + max_timestamp: Some(start_time), }) .await { Err(e) => { - error!("Failed to request backends: {}", e); + error!("Could not request meta keys from metastore: {}", e); return; } - Ok(backends) => backends, + Ok(result) => match result { + Err(e) => { + error!("Could not get meta keys from metastore: {}", e); + return; + } + Ok(res) => res, + }, }; - let must_rebuild = backends.into_iter().any(|b| !matches!(b, Ok(Some(_)))); - if must_rebuild { - if let Err(e) = zstor - .send(Rebuild { - file: None, - key: Some(key), + + // iterate over the keys and check if the backends are healthy + // if not, rebuild the object + for (key, metadata) in metas.into_iter() { + let backend_requests = metadata + .shards() + .iter() + .map(|shard_info| shard_info.zdb()) + .cloned() + .collect::>(); + let backends = match backend_manager + .send(RequestBackends { + backend_requests, + interest: StateInterest::Readable, }) .await { - warn!("Failed to rebuild data: {}", e); + Err(e) => { + error!("Failed to request backends: {}", e); + return; + } + Ok(backends) => backends, + }; + let must_rebuild = backends.into_iter().any(|b| !matches!(b, Ok(Some(_)))); + if must_rebuild { + if let Err(e) = zstor + .send(Rebuild { + file: None, + key: Some(key), + }) + .await + { + warn!("Failed to rebuild data: {}", e); + } } } + + if new_cursor.is_none() { + debug!("there is no more old data to rebuild"); + break; + } + + cursor = new_cursor; + backend_idx = Some(idx); } }) } diff --git a/zstor/src/meta.rs b/zstor/src/meta.rs index d219ad7..3076a42 100644 --- a/zstor/src/meta.rs +++ b/zstor/src/meta.rs @@ -213,13 +213,14 @@ pub trait MetaStore { /// If `cursor` is `None`, the scan will start from the beginning. /// If `backend_idx` is `None`, the scan will use backend which has the most keys. /// - /// Returns the backend index and cursor for the next scan and the keys themselves + /// Returns the backend index and cursor for the next scan and the keys themselves. + /// If there are no more keys with timestamp >= max_timestamp, the cursor will be `None` async fn scan_meta_keys( &self, cursor: Option>, backend_idx: Option, max_timestamp: Option, - ) -> Result<(usize, Vec, Vec), MetaStoreError>; + ) -> Result<(usize, Option>, Vec), MetaStoreError>; /// Get the (key, metadata) for all stored objects async fn object_metas(&self) -> Result, MetaStoreError>; diff --git a/zstor/src/zdb.rs b/zstor/src/zdb.rs index b960e8a..604fd75 100644 --- a/zstor/src/zdb.rs +++ b/zstor/src/zdb.rs @@ -818,15 +818,15 @@ impl UserKeyZdb { cursor: Option>, prefix: Option<&str>, max_timestamp: Option, - ) -> ZdbResult<(Vec, Vec)> { + ) -> ZdbResult<(Option>, Vec)> { let (cursor, entries): (Vec, Vec) = self.internal.scan(cursor).await?; let mut keys = Vec::new(); for entry in &entries { // check timestamp - if let Some(ts) = max_timestamp { - if entry[0].2 > ts { - continue; + if let Some(max_timestamp) = max_timestamp { + if entry[0].2 > max_timestamp { + return Ok((None, keys)); } } // check prefix @@ -841,7 +841,7 @@ impl UserKeyZdb { } } - Ok((cursor, keys)) + Ok((Some(cursor), keys)) } /// Get a stream which yields all the keys in the namespace. diff --git a/zstor/src/zdb_meta.rs b/zstor/src/zdb_meta.rs index 1db380d..8fddac5 100644 --- a/zstor/src/zdb_meta.rs +++ b/zstor/src/zdb_meta.rs @@ -272,7 +272,7 @@ where prefix: Option<&str>, backend_idx: Option, max_timestamp: Option, - ) -> ZdbMetaStoreResult<(usize, Vec, Vec)> { + ) -> ZdbMetaStoreResult<(usize, Option>, Vec)> { let most_keys_idx = match backend_idx { Some(idx) => idx, None => self.get_most_keys_backend().await?, @@ -613,16 +613,12 @@ where cursor: Option>, backend_idx: Option, max_timestamp: Option, - ) -> Result<(usize, Vec, Vec), MetaStoreError> { + ) -> Result<(usize, Option>, Vec), MetaStoreError> { let prefix = format!("/{}/meta/", self.prefix); - match self - .scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp) + self.scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp) .await - { - Ok((backend_idx, cursor, keys)) => Ok((backend_idx, cursor, keys)), - Err(e) => Err(MetaStoreError::from(e)), - } + .map_err(MetaStoreError::from) } async fn object_metas(&self) -> Result, MetaStoreError> {