diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 6bcac5db2a65..d7db60e8b17b 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -18,7 +18,6 @@ use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_meta::DatanodeId; use common_runtime::JoinError; -use rand::distributions::WeightedError; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::metadata::TableId; @@ -32,6 +31,14 @@ use crate::pubsub::Message; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { + #[snafu(display("Failed to choose items"))] + ChooseItems { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: rand::distributions::WeightedError, + }, + #[snafu(display("Exceeded deadline, operation: {}", operation))] ExceededDeadline { #[snafu(implicit)] @@ -643,20 +650,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to set weight array"))] - WeightArray { - #[snafu(source)] - error: WeightedError, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Weight array is not set"))] - NotSetWeightArray { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Unexpected table route type: {}", err_msg))] UnexpectedLogicalRouteTable { #[snafu(implicit)] @@ -759,10 +752,9 @@ impl ErrorExt for Error { | Error::NoEnoughAvailableNode { .. } | Error::PublishMessage { .. } | Error::Join { .. } - | Error::WeightArray { .. } - | Error::NotSetWeightArray { .. } | Error::PeerUnavailable { .. } - | Error::ExceededDeadline { .. } => StatusCode::Internal, + | Error::ExceededDeadline { .. } + | Error::ChooseItems { .. } => StatusCode::Internal, Error::Unsupported { .. } => StatusCode::Unsupported, diff --git a/src/meta-srv/src/selector/common.rs b/src/meta-srv/src/selector/common.rs index f1d127eea011..11e5b3741a68 100644 --- a/src/meta-srv/src/selector/common.rs +++ b/src/meta-srv/src/selector/common.rs @@ -12,29 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashSet; - use common_meta::peer::Peer; use snafu::ensure; -use super::weighted_choose::{WeightedChoose, WeightedItem}; +use super::weighted_choose::WeightedChoose; use crate::error; use crate::error::Result; use crate::metasrv::SelectTarget; use crate::selector::SelectorOptions; /// According to the `opts`, choose peers from the `weight_array` through `weighted_choose`. -pub fn choose_peers( - mut weight_array: Vec>, - opts: &SelectorOptions, - weighted_choose: &mut W, -) -> Result> +pub fn choose_peers(opts: &SelectorOptions, weighted_choose: &mut W) -> Result> where W: WeightedChoose, { let min_required_items = opts.min_required_items; ensure!( - !weight_array.is_empty(), + !weighted_choose.is_empty(), error::NoEnoughAvailableNodeSnafu { required: min_required_items, available: 0_usize, @@ -43,12 +37,11 @@ where ); if opts.allow_duplication { - weighted_choose.set_weight_array(weight_array)?; (0..min_required_items) .map(|_| weighted_choose.choose_one()) .collect::>() } else { - let weight_array_len = weight_array.len(); + let weight_array_len = weighted_choose.len(); // When opts.allow_duplication is false, we need to check that the length of the weighted array is greater than // or equal to min_required_items, otherwise it may cause an infinite loop. @@ -61,33 +54,7 @@ where } ); - if weight_array_len == min_required_items { - return Ok(weight_array.into_iter().map(|item| item.item).collect()); - } - - weighted_choose.set_weight_array(weight_array.clone())?; - - // Assume min_required_items is 3, weight_array_len is 100, then we can choose 3 items from the weight array - // and return. But assume min_required_items is 99, weight_array_len is 100. It's not cheap to choose 99 items - // from the weight array. So we can reverse choose 1 item from the weight array, and return the remaining 99 - // items. - if min_required_items * 2 > weight_array_len { - let select_num = weight_array_len - min_required_items; - let mut selected = HashSet::with_capacity(select_num); - while selected.len() < select_num { - let item = weighted_choose.reverse_choose_one()?; - selected.insert(item); - } - weight_array.retain(|item| !selected.contains(&item.item)); - Ok(weight_array.into_iter().map(|item| item.item).collect()) - } else { - let mut selected = HashSet::with_capacity(min_required_items); - while selected.len() < min_required_items { - let item = weighted_choose.choose_one()?; - selected.insert(item); - } - Ok(selected.into_iter().collect()) - } + weighted_choose.choose_multiple(min_required_items) } } @@ -110,7 +77,6 @@ mod tests { addr: "127.0.0.1:3001".to_string(), }, weight: 1, - reverse_weight: 1, }, WeightedItem { item: Peer { @@ -118,7 +84,6 @@ mod tests { addr: "127.0.0.1:3001".to_string(), }, weight: 1, - reverse_weight: 1, }, WeightedItem { item: Peer { @@ -126,7 +91,6 @@ mod tests { addr: "127.0.0.1:3001".to_string(), }, weight: 1, - reverse_weight: 1, }, WeightedItem { item: Peer { @@ -134,7 +98,6 @@ mod tests { addr: "127.0.0.1:3001".to_string(), }, weight: 1, - reverse_weight: 1, }, WeightedItem { item: Peer { @@ -142,7 +105,6 @@ mod tests { addr: "127.0.0.1:3001".to_string(), }, weight: 1, - reverse_weight: 1, }, ]; @@ -152,14 +114,11 @@ mod tests { allow_duplication: false, }; - let selected_peers: HashSet<_> = choose_peers( - weight_array.clone(), - &opts, - &mut RandomWeightedChoose::default(), - ) - .unwrap() - .into_iter() - .collect(); + let selected_peers: HashSet<_> = + choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone())) + .unwrap() + .into_iter() + .collect(); assert_eq!(i, selected_peers.len()); } @@ -169,11 +128,8 @@ mod tests { allow_duplication: false, }; - let selected_result = choose_peers( - weight_array.clone(), - &opts, - &mut RandomWeightedChoose::default(), - ); + let selected_result = + choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone())); assert!(selected_result.is_err()); for i in 1..=50 { @@ -182,12 +138,8 @@ mod tests { allow_duplication: true, }; - let selected_peers = choose_peers( - weight_array.clone(), - &opts, - &mut RandomWeightedChoose::default(), - ) - .unwrap(); + let selected_peers = + choose_peers(&opts, &mut RandomWeightedChoose::new(weight_array.clone())).unwrap(); assert_eq!(i, selected_peers.len()); } diff --git a/src/meta-srv/src/selector/lease_based.rs b/src/meta-srv/src/selector/lease_based.rs index dabf5a0c8f53..3ab99eb31e6b 100644 --- a/src/meta-srv/src/selector/lease_based.rs +++ b/src/meta-srv/src/selector/lease_based.rs @@ -48,13 +48,12 @@ impl Selector for LeaseBasedSelector { addr: v.node_addr.clone(), }, weight: 1, - reverse_weight: 1, }) .collect(); // 3. choose peers by weight_array. - let weighted_choose = &mut RandomWeightedChoose::default(); - let selected = choose_peers(weight_array, &opts, weighted_choose)?; + let mut weighted_choose = RandomWeightedChoose::new(weight_array); + let selected = choose_peers(&opts, &mut weighted_choose)?; Ok(selected) } diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index d7b650deda80..f52d6f9fc38e 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -19,7 +19,6 @@ use common_meta::key::TableMetadataManager; use common_meta::peer::Peer; use common_meta::rpc::router::find_leaders; use common_telemetry::{debug, info}; -use parking_lot::RwLock; use snafu::ResultExt; use table::metadata::TableId; @@ -29,36 +28,30 @@ use crate::lease; use crate::metasrv::SelectorContext; use crate::selector::common::choose_peers; use crate::selector::weight_compute::{RegionNumsBasedWeightCompute, WeightCompute}; -use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedChoose}; +use crate::selector::weighted_choose::RandomWeightedChoose; use crate::selector::{Namespace, Selector, SelectorOptions}; -pub struct LoadBasedSelector { - weighted_choose: RwLock, +pub struct LoadBasedSelector { weight_compute: C, } -impl LoadBasedSelector { - pub fn new(weighted_choose: W, weight_compute: C) -> Self { - Self { - weighted_choose: RwLock::new(weighted_choose), - weight_compute, - } +impl LoadBasedSelector { + pub fn new(weight_compute: C) -> Self { + Self { weight_compute } } } -impl Default for LoadBasedSelector, RegionNumsBasedWeightCompute> { +impl Default for LoadBasedSelector { fn default() -> Self { Self { - weighted_choose: RwLock::new(RandomWeightedChoose::default()), weight_compute: RegionNumsBasedWeightCompute, } } } #[async_trait::async_trait] -impl Selector for LoadBasedSelector +impl Selector for LoadBasedSelector where - W: WeightedChoose, C: WeightCompute>, { type Context = SelectorContext; @@ -100,8 +93,8 @@ where let weight_array = self.weight_compute.compute(&stat_kvs); // 5. choose peers by weight_array. - let mut weighted_choose = self.weighted_choose.write(); - let selected = choose_peers(weight_array, &opts, &mut *weighted_choose)?; + let mut weighted_choose = RandomWeightedChoose::new(weight_array); + let selected = choose_peers(&opts, &mut weighted_choose)?; debug!( "LoadBasedSelector select peers: {:?}, namespace: {}, opts: {:?}.", diff --git a/src/meta-srv/src/selector/weight_compute.rs b/src/meta-srv/src/selector/weight_compute.rs index 7f3b28a364ea..16289bc3bd33 100644 --- a/src/meta-srv/src/selector/weight_compute.rs +++ b/src/meta-srv/src/selector/weight_compute.rs @@ -85,7 +85,6 @@ impl WeightCompute for RegionNumsBasedWeightCompute { .map(|(peer, region_num)| WeightedItem { item: peer, weight: (max_weight - region_num + base_weight) as usize, - reverse_weight: (region_num - min_weight + base_weight) as usize, }) .collect() } @@ -181,10 +180,6 @@ mod tests { }, 4, ); - - for weight in weight_array.iter() { - assert_eq!(weight.reverse_weight, *expected.get(&weight.item).unwrap()); - } } fn mock_stat_1() -> Stat { diff --git a/src/meta-srv/src/selector/weighted_choose.rs b/src/meta-srv/src/selector/weighted_choose.rs index d3a555043285..9fe8d7b28d14 100644 --- a/src/meta-srv/src/selector/weighted_choose.rs +++ b/src/meta-srv/src/selector/weighted_choose.rs @@ -12,41 +12,37 @@ // See the License for the specific language governing permissions and // limitations under the License. -use rand::distributions::WeightedIndex; -use rand::prelude::Distribution; +use rand::seq::SliceRandom; use rand::thread_rng; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use crate::error; use crate::error::Result; /// A common trait for weighted balance algorithm. pub trait WeightedChoose: Send + Sync { - /// The method will re-set weight array. - /// - /// Note: - /// 1. make sure weight_array is not empty. - /// 2. the total weight is greater than 0. - /// - /// Otherwise an error will be returned. - fn set_weight_array(&mut self, weight_array: Vec>) -> Result<()>; - /// The method will choose one item. - /// - /// If not set weight_array before, an error will be returned. fn choose_one(&mut self) -> Result; - /// The method will reverse choose one item. + /// The method will choose multiple items. /// - /// If not set weight_array before, an error will be returned. - fn reverse_choose_one(&mut self) -> Result; + /// Returns less than `amount` items if the weight_array is not enough. + fn choose_multiple(&mut self, amount: usize) -> Result>; + + /// Returns the length of the weight_array. + fn len(&self) -> usize; + + /// Returns whether the weight_array is empty. + fn is_empty(&self) -> bool { + self.len() == 0 + } } +/// The struct represents a weighted item. #[derive(Debug, Clone, PartialEq, Eq)] pub struct WeightedItem { pub item: Item, pub weight: usize, - pub reverse_weight: usize, } /// A implementation of weighted balance: random weighted choose. @@ -64,16 +60,18 @@ pub struct WeightedItem { /// ``` pub struct RandomWeightedChoose { items: Vec>, - weighted_index: Option>, - reverse_weighted_index: Option>, +} + +impl RandomWeightedChoose { + pub fn new(items: Vec>) -> Self { + Self { items } + } } impl Default for RandomWeightedChoose { fn default() -> Self { Self { items: Vec::default(), - weighted_index: None, - reverse_weighted_index: None, } } } @@ -82,48 +80,29 @@ impl WeightedChoose for RandomWeightedChoose where Item: Clone + Send + Sync, { - fn set_weight_array(&mut self, weight_array: Vec>) -> Result<()> { - self.weighted_index = Some( - WeightedIndex::new(weight_array.iter().map(|item| item.weight)) - .context(error::WeightArraySnafu)?, - ); - - self.reverse_weighted_index = Some( - WeightedIndex::new(weight_array.iter().map(|item| item.reverse_weight)) - .context(error::WeightArraySnafu)?, - ); - - self.items = weight_array; - - Ok(()) - } - fn choose_one(&mut self) -> Result { - ensure!( - !self.items.is_empty() && self.weighted_index.is_some(), - error::NotSetWeightArraySnafu - ); - // unwrap safety: whether weighted_index is none has been checked before. - let weighted_index = self.weighted_index.as_ref().unwrap(); - - Ok(self.items[weighted_index.sample(&mut thread_rng())] + let item = self + .items + .choose_weighted(&mut thread_rng(), |item| item.weight as f64) + .context(error::ChooseItemsSnafu)? .item - .clone()) + .clone(); + Ok(item) } - fn reverse_choose_one(&mut self) -> Result { - ensure!( - !self.items.is_empty() && self.reverse_weighted_index.is_some(), - error::NotSetWeightArraySnafu - ); - - // unwrap safety: whether reverse_weighted_index is none has been checked before. - let reverse_weighted_index = self.reverse_weighted_index.as_ref().unwrap(); + fn choose_multiple(&mut self, amount: usize) -> Result> { + Ok(self + .items + .choose_multiple_weighted(&mut thread_rng(), amount, |item| item.weight as f64) + .context(error::ChooseItemsSnafu)? + .cloned() + .map(|item| item.item) + .collect::>()) + } - Ok(self.items[reverse_weighted_index.sample(&mut thread_rng())] - .item - .clone()) + fn len(&self) -> usize { + self.items.len() } } @@ -133,45 +112,22 @@ mod tests { #[test] fn test_random_weighted_choose() { - let mut choose = RandomWeightedChoose::default(); - choose - .set_weight_array(vec![ - WeightedItem { - item: 1, - weight: 100, - reverse_weight: 0, - }, - WeightedItem { - item: 2, - weight: 0, - reverse_weight: 100, - }, - ]) - .unwrap(); + let mut choose = RandomWeightedChoose::new(vec![ + WeightedItem { + item: 1, + weight: 100, + }, + WeightedItem { item: 2, weight: 0 }, + ]); + for _ in 0..100 { let ret = choose.choose_one().unwrap(); assert_eq!(1, ret); } for _ in 0..100 { - let ret = choose.reverse_choose_one().unwrap(); - assert_eq!(2, ret); + let ret = choose.choose_multiple(3).unwrap(); + assert_eq!(vec![1, 2], ret); } } - - #[test] - #[should_panic] - fn test_random_weighted_choose_should_panic() { - let mut choose: RandomWeightedChoose = RandomWeightedChoose::default(); - choose.set_weight_array(vec![]).unwrap(); - let _ = choose.choose_one().unwrap(); - } - - #[test] - #[should_panic] - fn test_random_reverse_weighted_choose_should_panic() { - let mut choose: RandomWeightedChoose = RandomWeightedChoose::default(); - choose.set_weight_array(vec![]).unwrap(); - let _ = choose.reverse_choose_one().unwrap(); - } }