Skip to content

Commit

Permalink
refactor: simplify WeightedChoose (#4916)
Browse files Browse the repository at this point in the history
* refactor: simplify WeightedChoose

* chore: remove unused errors
  • Loading branch information
WenyXu authored Oct 31, 2024
1 parent 8b60c27 commit 758ad0a
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 195 deletions.
28 changes: 10 additions & 18 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use common_meta::DatanodeId;
use common_runtime::JoinError;
use rand::distributions::WeightedError;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::metadata::TableId;
Expand All @@ -32,6 +31,14 @@ use crate::pubsub::Message;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to choose items"))]
ChooseItems {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: rand::distributions::WeightedError,
},

#[snafu(display("Exceeded deadline, operation: {}", operation))]
ExceededDeadline {
#[snafu(implicit)]
Expand Down Expand Up @@ -643,20 +650,6 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to set weight array"))]
WeightArray {
#[snafu(source)]
error: WeightedError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Weight array is not set"))]
NotSetWeightArray {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Unexpected table route type: {}", err_msg))]
UnexpectedLogicalRouteTable {
#[snafu(implicit)]
Expand Down Expand Up @@ -759,10 +752,9 @@ impl ErrorExt for Error {
| Error::NoEnoughAvailableNode { .. }
| Error::PublishMessage { .. }
| Error::Join { .. }
| Error::WeightArray { .. }
| Error::NotSetWeightArray { .. }
| Error::PeerUnavailable { .. }
| Error::ExceededDeadline { .. } => StatusCode::Internal,
| Error::ExceededDeadline { .. }
| Error::ChooseItems { .. } => StatusCode::Internal,

Error::Unsupported { .. } => StatusCode::Unsupported,

Expand Down
76 changes: 14 additions & 62 deletions src/meta-srv/src/selector/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;

use common_meta::peer::Peer;
use snafu::ensure;

use super::weighted_choose::{WeightedChoose, WeightedItem};
use super::weighted_choose::WeightedChoose;
use crate::error;
use crate::error::Result;
use crate::metasrv::SelectTarget;
use crate::selector::SelectorOptions;

/// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`.
pub fn choose_peers<W>(
mut weight_array: Vec<WeightedItem<Peer>>,
opts: &SelectorOptions,
weighted_choose: &mut W,
) -> Result<Vec<Peer>>
pub fn choose_peers<W>(opts: &SelectorOptions, weighted_choose: &mut W) -> Result<Vec<Peer>>
where
W: WeightedChoose<Peer>,
{
let min_required_items = opts.min_required_items;
ensure!(
!weight_array.is_empty(),
!weighted_choose.is_empty(),
error::NoEnoughAvailableNodeSnafu {
required: min_required_items,
available: 0_usize,
Expand All @@ -43,12 +37,11 @@ where
);

if opts.allow_duplication {
weighted_choose.set_weight_array(weight_array)?;
(0..min_required_items)
.map(|_| weighted_choose.choose_one())
.collect::<Result<_>>()
} else {
let weight_array_len = weight_array.len();
let weight_array_len = weighted_choose.len();

// When opts.allow_duplication is false, we need to check that the length of the weighted array is greater than
// or equal to min_required_items, otherwise it may cause an infinite loop.
Expand All @@ -61,33 +54,7 @@ where
}
);

if weight_array_len == min_required_items {
return Ok(weight_array.into_iter().map(|item| item.item).collect());
}

weighted_choose.set_weight_array(weight_array.clone())?;

// Assume min_required_items is 3, weight_array_len is 100, then we can choose 3 items from the weight array
// and return. But assume min_required_items is 99, weight_array_len is 100. It's not cheap to choose 99 items
// from the weight array. So we can reverse choose 1 item from the weight array, and return the remaining 99
// items.
if min_required_items * 2 > weight_array_len {
let select_num = weight_array_len - min_required_items;
let mut selected = HashSet::with_capacity(select_num);
while selected.len() < select_num {
let item = weighted_choose.reverse_choose_one()?;
selected.insert(item);
}
weight_array.retain(|item| !selected.contains(&item.item));
Ok(weight_array.into_iter().map(|item| item.item).collect())
} else {
let mut selected = HashSet::with_capacity(min_required_items);
while selected.len() < min_required_items {
let item = weighted_choose.choose_one()?;
selected.insert(item);
}
Ok(selected.into_iter().collect())
}
weighted_choose.choose_multiple(min_required_items)
}
}

Expand All @@ -110,39 +77,34 @@ mod tests {
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
reverse_weight: 1,
},
WeightedItem {
item: Peer {
id: 2,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
reverse_weight: 1,
},
WeightedItem {
item: Peer {
id: 3,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
reverse_weight: 1,
},
WeightedItem {
item: Peer {
id: 4,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
reverse_weight: 1,
},
WeightedItem {
item: Peer {
id: 5,
addr: "127.0.0.1:3001".to_string(),
},
weight: 1,
reverse_weight: 1,
},
];

Expand All @@ -152,14 +114,11 @@ mod tests {
allow_duplication: false,
};

let selected_peers: HashSet<_> = choose_peers(
weight_array.clone(),
&opts,
&mut RandomWeightedChoose::default(),
)
.unwrap()
.into_iter()
.collect();
let selected_peers: HashSet<_> =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone()))
.unwrap()
.into_iter()
.collect();

assert_eq!(i, selected_peers.len());
}
Expand All @@ -169,11 +128,8 @@ mod tests {
allow_duplication: false,
};

let selected_result = choose_peers(
weight_array.clone(),
&opts,
&mut RandomWeightedChoose::default(),
);
let selected_result =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone()));
assert!(selected_result.is_err());

for i in 1..=50 {
Expand All @@ -182,12 +138,8 @@ mod tests {
allow_duplication: true,
};

let selected_peers = choose_peers(
weight_array.clone(),
&opts,
&mut RandomWeightedChoose::default(),
)
.unwrap();
let selected_peers =
choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap();

assert_eq!(i, selected_peers.len());
}
Expand Down
5 changes: 2 additions & 3 deletions src/meta-srv/src/selector/lease_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,12 @@ impl Selector for LeaseBasedSelector {
addr: v.node_addr.clone(),
},
weight: 1,
reverse_weight: 1,
})
.collect();

// 3. choose peers by weight_array.
let weighted_choose = &mut RandomWeightedChoose::default();
let selected = choose_peers(weight_array, &opts, weighted_choose)?;
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_peers(&opts, &mut weighted_choose)?;

Ok(selected)
}
Expand Down
25 changes: 9 additions & 16 deletions src/meta-srv/src/selector/load_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use common_meta::key::TableMetadataManager;
use common_meta::peer::Peer;
use common_meta::rpc::router::find_leaders;
use common_telemetry::{debug, info};
use parking_lot::RwLock;
use snafu::ResultExt;
use table::metadata::TableId;

Expand All @@ -29,36 +28,30 @@ use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::common::choose_peers;
use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute};
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedChoose};
use crate::selector::weighted_choose::RandomWeightedChoose;
use crate::selector::{Namespace, Selector, SelectorOptions};

pub struct LoadBasedSelector<W, C> {
weighted_choose: RwLock<W>,
pub struct LoadBasedSelector<C> {
weight_compute: C,
}

impl<W, C> LoadBasedSelector<W, C> {
pub fn new(weighted_choose: W, weight_compute: C) -> Self {
Self {
weighted_choose: RwLock::new(weighted_choose),
weight_compute,
}
impl<C> LoadBasedSelector<C> {
pub fn new(weight_compute: C) -> Self {
Self { weight_compute }
}
}

impl Default for LoadBasedSelector<RandomWeightedChoose<Peer>, RegionNumsBasedWeightCompute> {
impl Default for LoadBasedSelector<RegionNumsBasedWeightCompute> {
fn default() -> Self {
Self {
weighted_choose: RwLock::new(RandomWeightedChoose::default()),
weight_compute: RegionNumsBasedWeightCompute,
}
}
}

#[async_trait::async_trait]
impl<W, C> Selector for LoadBasedSelector<W, C>
impl<C> Selector for LoadBasedSelector<C>
where
W: WeightedChoose<Peer>,
C: WeightCompute<Source = HashMap<DatanodeStatKey, DatanodeStatValue>>,
{
type Context = SelectorContext;
Expand Down Expand Up @@ -100,8 +93,8 @@ where
let weight_array = self.weight_compute.compute(&stat_kvs);

// 5. choose peers by weight_array.
let mut weighted_choose = self.weighted_choose.write();
let selected = choose_peers(weight_array, &opts, &mut *weighted_choose)?;
let mut weighted_choose = RandomWeightedChoose::new(weight_array);
let selected = choose_peers(&opts, &mut weighted_choose)?;

debug!(
"LoadBasedSelector select peers: {:?}, namespace: {}, opts: {:?}.",
Expand Down
5 changes: 0 additions & 5 deletions src/meta-srv/src/selector/weight_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ impl WeightCompute for RegionNumsBasedWeightCompute {
.map(|(peer, region_num)| WeightedItem {
item: peer,
weight: (max_weight - region_num + base_weight) as usize,
reverse_weight: (region_num - min_weight + base_weight) as usize,
})
.collect()
}
Expand Down Expand Up @@ -181,10 +180,6 @@ mod tests {
},
4,
);

for weight in weight_array.iter() {
assert_eq!(weight.reverse_weight, *expected.get(&weight.item).unwrap());
}
}

fn mock_stat_1() -> Stat {
Expand Down
Loading

0 comments on commit 758ad0a

Please sign in to comment.