Skip to content

Commit

Permalink
fix(dedup rebuild): if possible, use same backend during rebuild
Browse files Browse the repository at this point in the history
  • Loading branch information
iwanbk committed Dec 11, 2024
1 parent 10a663d commit 73d4705
Showing 1 changed file with 190 additions and 2 deletions.
192 changes: 190 additions & 2 deletions zstor/src/actors/zstor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
config::Config,
erasure::Shard,
meta::{Checksum, MetaData, ShardInfo},
zdb::{SequentialZdb, ZdbError, ZdbResult},
zdb::{Key, SequentialZdb, ZdbConnectionInfo, ZdbError, ZdbResult},
ZstorError, ZstorResult,
};
use actix::prelude::*;
Expand Down Expand Up @@ -324,6 +324,7 @@ impl Handler<Rebuild> for ZstorActor {
};

let input = load_data(&old_metadata).await?;
let existing_data = input.clone();
let (mut metadata, shards) = pipeline
.send(RebuildData {
input,
Expand All @@ -332,7 +333,31 @@ impl Handler<Rebuild> for ZstorActor {
})
.await??;

save_data(&mut cfg.deref().clone(), shards, &mut metadata).await?;
// build a list of the key and the backend used for the shards
let mut used_backends = Vec::new();
for (i, data) in existing_data.iter().enumerate() {
let key = old_metadata.shards()[i].key().to_vec();
if let Some(data) = data {
if data.as_slice() == shards[i].as_ref() {
used_backends.push((key, Some(old_metadata.shards()[i].zdb().clone())));
debug!("Shard {} is the SAME", i);
} else {
used_backends.push((key, None));
warn!("Shard {} is DIFFERENT", i);
}
} else {
debug!("Shard {} is MISSING", i);
used_backends.push((key, None));
}
}

save_rebuild_data(
&mut cfg.deref().clone(),
shards,
&mut metadata,
used_backends,
)
.await?;

info!(
"Rebuild file from {} to {}",
Expand Down Expand Up @@ -471,6 +496,154 @@ async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
Ok(shards)
}

async fn save_rebuild_data(
cfg: &mut Config,
shards: Vec<Shard>,
metadata: &mut MetaData,
used_backends: Vec<(Vec<Key>, Option<ZdbConnectionInfo>)>,
) -> ZstorResult<()> {
let shard_len = if shards.is_empty() {
0
} else {
shards[0].len()
};
let mut existing_backends_num = 0;
for (_, ci) in used_backends.iter() {
if ci.is_some() {
existing_backends_num += 1;
}
}

// improvements:
// - specify which shards that not missing
// - specify which backends that already used
// for above two, maybe we can use this Vec<Option<ZdbConnectionInfo>>
// example [3, x, 1, 2, x, 4]
// if x, it means missing
// if not x, it means the backend already used, we also don't need to check it again

let dbs = loop {
debug!("Finding backend config");
// TODO cfg.shard_stores is optimized for the save_data, not rebuild data.
// we could possibly improve it by ignore the used backends
let backends = cfg.shard_stores()?;

let mut failed_shards: usize = 0;
let mut handles: Vec<JoinHandle<ZdbResult<_>>> = Vec::with_capacity(shards.len());

// spawn tokio tasks to get ns info for all backends
for backend in backends {
// skip if the backend already in the used_backends
// we will add this backend later
if used_backends
.iter()
.any(|(_, b)| b.as_ref() == Some(&backend))
{
log::info!("skipping backend: {:?}", backend.address());
continue;
}
log::info!("processing backend: {:?}", backend.address());
handles.push(tokio::spawn(async move {
let db = SequentialZdb::new(backend.clone()).await?;
// check space in backend
let ns_info = db.ns_info().await?;
match ns_info.free_space() {
insufficient if (insufficient as usize) < shard_len => {
Err(ZdbError::new_storage_size(
db.connection_info().clone(),
shard_len,
ns_info.free_space() as usize,
))
}
_ => Ok(db),
}
}));
}

// IBK collect all results
let mut new_dbs = Vec::new();
for db in join_all(handles).await {
match db? {
Err(zdbe) => {
debug!("could not connect to 0-db: {}", zdbe);
cfg.remove_shard(zdbe.remote());
failed_shards += 1;
}
Ok(db) => new_dbs.push(db), // no error so healthy db backend
}
}
// check number of the new_dbs is enough for us.
log::info!("dbs len: {}", new_dbs.len());
if new_dbs.len() < shards.len() - existing_backends_num {
info!(
"not enough backends found, needed: {}, found: {}",
shards.len() - existing_backends_num,
new_dbs.len()
);
continue;
}

// create the key,connection_info, and db for the shards
// - if the backend is already used, we don't need to set the shard
// hence the None db
// - if the backend is not used, we need to set the shard
// hence the Some(db) which will be used the set the shard
let mut key_dbs = Vec::new();
for (key, ci) in used_backends.iter() {
if let Some(ci) = ci {
key_dbs.push((key.clone(), ci.clone(), None));
} else {
let db = new_dbs.pop().unwrap();
let conn_info = db.connection_info().clone();
key_dbs.push((key.clone(), conn_info, Some(db)));
}
}

// if we find one we are good
// TODO: add some limit to prevent infinite loop
if failed_shards == 0 {
debug!("found valid backend configuration");
break key_dbs;
}

info!("Backend config failed");
};

trace!("store shards in backends");

let mut handles: Vec<JoinHandle<ZstorResult<_>>> = Vec::with_capacity(shards.len());
for ((existing_key, existing_ci, db), (shard_idx, shard)) in
dbs.into_iter().zip(shards.into_iter().enumerate())
{
handles.push(tokio::spawn(async move {
if let Some(db) = db {
log::info!("db set for ci: {:?}", db.connection_info());
let keys = db.set(&shard).await?;
Ok(ShardInfo::new(
shard_idx,
shard.checksum(),
keys,
db.connection_info().clone(),
))
} else {
// no need to db.set if it is an already used backend (shard is not missing)
Ok(ShardInfo::new(
shard_idx,
shard.checksum(),
existing_key.clone(),
existing_ci.clone(),
))
}
}));
}

for shard_info in try_join_all(handles).await? {
metadata.add_shard(shard_info?);
}

Ok(())
}

async fn save_data(
cfg: &mut Config,
shards: Vec<Shard>,
Expand All @@ -482,14 +655,26 @@ async fn save_data(
shards[0].len()
};

// improvements:
// - specify which shards that not missing
// - specify which backends that already used
// for above two, maybe we can use this Vec<Option<ZdbConnectionInfo>>
// example [3, x, 1, 2, x, 4]
// if x, it means missing
// if not x, it means the backend already used, we also don't need to check it again

let dbs = loop {
debug!("Finding backend config");
let backends = cfg.shard_stores()?;

let mut failed_shards: usize = 0;
let mut handles: Vec<JoinHandle<ZdbResult<_>>> = Vec::with_capacity(shards.len());

// spawn tokio tasks to get ns info for all backends
for backend in backends {
// IBK returns OK if it is used backend
// IBK or skip it
// IBK break loop if we already have the needed backends
handles.push(tokio::spawn(async move {
let db = SequentialZdb::new(backend.clone()).await?;
// check space in backend
Expand All @@ -507,6 +692,8 @@ async fn save_data(
}));
}

//IBK recreate the backends to include the already used backends
// IBK collect all results
let mut dbs = Vec::new();
for db in join_all(handles).await {
match db? {
Expand All @@ -533,6 +720,7 @@ async fn save_data(
let mut handles: Vec<JoinHandle<ZstorResult<_>>> = Vec::with_capacity(shards.len());
for (db, (shard_idx, shard)) in dbs.into_iter().zip(shards.into_iter().enumerate()) {
handles.push(tokio::spawn(async move {
// IBK, no need to db.set if it is an already used backend (shard is not missing)
let keys = db.set(&shard).await?;
Ok(ShardInfo::new(
shard_idx,
Expand Down

0 comments on commit 73d4705

Please sign in to comment.