Skip to content

Commit

Permalink
improve sweep object to use scan.
Browse files Browse the repository at this point in the history
- Previous object_metas return _all_ keys which is not scalable.
  change it to scan for scalability
- fix scan to stop after found data with timestamp > max_timestamp
  • Loading branch information
iwanbk authored and LeeSmet committed Dec 6, 2024
1 parent cbe5c4b commit 10a663d
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 51 deletions.
60 changes: 59 additions & 1 deletion zstor/src/actors/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ pub struct IsReplaced {
pub ci: ZdbConnectionInfo,
}

#[derive(Message)]
#[rtype(result = "Result<(usize, Option<Vec<u8>>, Vec<(String, MetaData)>), MetaStoreError>")]
/// Message for scan [`MetaData`] objects in a [`MetaStore`] managed by a [`MetaStoreActor`].
pub struct ScanMeta {
/// Cursor to start scanning from.
pub cursor: Option<Vec<u8>>,

/// Backend index to scan from.
/// If none, it will use the backend with most keys
pub backend_idx: Option<usize>,

/// Maximum timestamp to scan until.
pub max_timestamp: Option<u64>,
}

#[derive(Message)]
#[rtype(result = "Result<Vec<(String, MetaData)>, MetaStoreError>")]
/// Message for retrieving all [`MetaData`] objects in a [`MetaStore`] managed by a [`MetaStoreActor`].
Expand Down Expand Up @@ -208,6 +223,46 @@ impl Handler<IsReplaced> for MetaStoreActor {
}
}

impl Handler<ScanMeta> for MetaStoreActor {
type Result =
ResponseFuture<Result<(usize, Option<Vec<u8>>, Vec<(String, MetaData)>), MetaStoreError>>;

fn handle(&mut self, msg: ScanMeta, ctx: &mut Self::Context) -> Self::Result {
let meta_store = self.meta_store.clone();
let addr = ctx.address();

Box::pin(async move {
let (new_cursor, backend_idx, keys) = match meta_store
.scan_meta_keys(msg.cursor, msg.backend_idx, msg.max_timestamp)
.await
{
Ok(res) => res,
Err(e) => {
return Err(e);
}
};

let mut metas = Vec::with_capacity(keys.len());

for key in keys {
let meta: MetaData = match addr.send(LoadMetaByKey { key: key.clone() }).await {
Ok(Ok(m)) => m.unwrap(),
Ok(Err(e)) => {
log::error!("Error loading meta by key:{} - {}", key, e);
continue;
}
Err(e) => {
log::error!("Error loading meta by key:{} - {}", key, e);
continue;
}
};
metas.push((key, meta));
}
Ok((new_cursor, backend_idx, metas))
})
}
}

impl Handler<ObjectMetas> for MetaStoreActor {
type Result = ResponseFuture<Result<Vec<(String, MetaData)>, MetaStoreError>>;

Expand Down Expand Up @@ -336,7 +391,10 @@ impl Handler<RebuildAllMeta> for MetaStoreActor {
}
}
}
cursor = Some(new_cursor);
if cursor.is_none() {
break;
}
cursor = new_cursor;
backend_idx = Some(idx);
}
})
Expand Down
99 changes: 64 additions & 35 deletions zstor/src/actors/repairer.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::actors::{
backends::{BackendManagerActor, RequestBackends, StateInterest},
meta::{MetaStoreActor, ObjectMetas},
meta::{MetaStoreActor, ScanMeta},
zstor::{Rebuild, ZstorActor},
};
use actix::prelude::*;
use log::{error, warn};
use log::{debug, error, warn};
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};

/// Amount of time between starting a new sweep of the backend objects.
const OBJECT_SWEEP_INTERVAL_SECONDS: u64 = 60 * 10;
Expand Down Expand Up @@ -65,52 +66,80 @@ impl Handler<SweepObjects> for RepairActor {
let zstor = self.zstor.clone();

Box::pin(async move {
let obj_metas = match meta.send(ObjectMetas).await {
Err(e) => {
error!("Could not request object metas from metastore: {}", e);
return;
}
Ok(om) => match om {
Err(e) => {
error!("Could not get object metas from metastore: {}", e);
return;
}
Ok(om) => om,
},
};
let start_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();

for (key, metadata) in obj_metas.into_iter() {
let backend_requests = metadata
.shards()
.iter()
.map(|shard_info| shard_info.zdb())
.cloned()
.collect::<Vec<_>>();
let backends = match backend_manager
.send(RequestBackends {
backend_requests,
interest: StateInterest::Readable,
// start scanning from the beginning (cursor == None) and let the metastore choose the backend_id
let mut cursor = None;
let mut backend_idx = None;
loop {
// scan keys from the metastore
let (idx, new_cursor, metas) = match meta
.send(ScanMeta {
cursor: cursor.clone(),
backend_idx,
max_timestamp: Some(start_time),
})
.await
{
Err(e) => {
error!("Failed to request backends: {}", e);
error!("Could not request meta keys from metastore: {}", e);
return;
}
Ok(backends) => backends,
Ok(result) => match result {
Err(e) => {
error!("Could not get meta keys from metastore: {}", e);
return;
}
Ok(res) => res,
},
};
let must_rebuild = backends.into_iter().any(|b| !matches!(b, Ok(Some(_))));
if must_rebuild {
if let Err(e) = zstor
.send(Rebuild {
file: None,
key: Some(key),

// iterate over the keys and check if the backends are healthy
// if not, rebuild the object
for (key, metadata) in metas.into_iter() {
let backend_requests = metadata
.shards()
.iter()
.map(|shard_info| shard_info.zdb())
.cloned()
.collect::<Vec<_>>();
let backends = match backend_manager
.send(RequestBackends {
backend_requests,
interest: StateInterest::Readable,
})
.await
{
warn!("Failed to rebuild data: {}", e);
Err(e) => {
error!("Failed to request backends: {}", e);
return;
}
Ok(backends) => backends,
};
let must_rebuild = backends.into_iter().any(|b| !matches!(b, Ok(Some(_))));
if must_rebuild {
if let Err(e) = zstor
.send(Rebuild {
file: None,
key: Some(key),
})
.await
{
warn!("Failed to rebuild data: {}", e);
}
}
}

if new_cursor.is_none() {
debug!("there is no more old data to rebuild");
break;
}

cursor = new_cursor;
backend_idx = Some(idx);
}
})
}
Expand Down
5 changes: 3 additions & 2 deletions zstor/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,14 @@ pub trait MetaStore {
/// If `cursor` is `None`, the scan will start from the beginning.
/// If `backend_idx` is `None`, the scan will use backend which has the most keys.
///
/// Returns the backend index and cursor for the next scan and the keys themselves
/// Returns the backend index and cursor for the next scan and the keys themselves.
/// If there are no more keys with timestamp >= max_timestamp, the cursor will be `None`
async fn scan_meta_keys(
&self,
cursor: Option<Vec<u8>>,
backend_idx: Option<usize>,
max_timestamp: Option<u64>,
) -> Result<(usize, Vec<u8>, Vec<String>), MetaStoreError>;
) -> Result<(usize, Option<Vec<u8>>, Vec<String>), MetaStoreError>;

/// Get the (key, metadata) for all stored objects
async fn object_metas(&self) -> Result<Vec<(String, MetaData)>, MetaStoreError>;
Expand Down
10 changes: 5 additions & 5 deletions zstor/src/zdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,15 +818,15 @@ impl UserKeyZdb {
cursor: Option<Vec<u8>>,
prefix: Option<&str>,
max_timestamp: Option<u64>,
) -> ZdbResult<(Vec<u8>, Vec<String>)> {
) -> ZdbResult<(Option<Vec<u8>>, Vec<String>)> {
let (cursor, entries): (Vec<u8>, Vec<ScanEntry>) = self.internal.scan(cursor).await?;

let mut keys = Vec::new();
for entry in &entries {
// check timestamp
if let Some(ts) = max_timestamp {
if entry[0].2 > ts {
continue;
if let Some(max_timestamp) = max_timestamp {
if entry[0].2 > max_timestamp {
return Ok((None, keys));
}
}
// check prefix
Expand All @@ -841,7 +841,7 @@ impl UserKeyZdb {
}
}

Ok((cursor, keys))
Ok((Some(cursor), keys))
}

/// Get a stream which yields all the keys in the namespace.
Expand Down
12 changes: 4 additions & 8 deletions zstor/src/zdb_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ where
prefix: Option<&str>,
backend_idx: Option<usize>,
max_timestamp: Option<u64>,
) -> ZdbMetaStoreResult<(usize, Vec<u8>, Vec<String>)> {
) -> ZdbMetaStoreResult<(usize, Option<Vec<u8>>, Vec<String>)> {
let most_keys_idx = match backend_idx {
Some(idx) => idx,
None => self.get_most_keys_backend().await?,
Expand Down Expand Up @@ -613,16 +613,12 @@ where
cursor: Option<Vec<u8>>,
backend_idx: Option<usize>,
max_timestamp: Option<u64>,
) -> Result<(usize, Vec<u8>, Vec<String>), MetaStoreError> {
) -> Result<(usize, Option<Vec<u8>>, Vec<String>), MetaStoreError> {
let prefix = format!("/{}/meta/", self.prefix);

match self
.scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp)
self.scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp)
.await
{
Ok((backend_idx, cursor, keys)) => Ok((backend_idx, cursor, keys)),
Err(e) => Err(MetaStoreError::from(e)),
}
.map_err(MetaStoreError::from)
}

async fn object_metas(&self) -> Result<Vec<(String, MetaData)>, MetaStoreError> {
Expand Down

0 comments on commit 10a663d

Please sign in to comment.