Skip to content

Commit

Permalink
feat: round-robin selector (GreptimeTeam#4024)
Browse files Browse the repository at this point in the history
* feat: implement round robin peer selector

Signed-off-by: Ruihang Xia <[email protected]>

* add document and test

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored May 24, 2024
1 parent 466f7c6 commit a58256d
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ impl TableMetadataAllocator {

pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;

/// [PeerAllocator] allocates [Peer]s for creating regions.
/// [`PeerAllocator`] allocates [`Peer`]s for creating regions.
#[async_trait]
pub trait PeerAllocator: Send + Sync {
/// Allocates `regions` size [Peer]s.
/// Allocates `regions` size [`Peer`]s.
async fn alloc(&self, ctx: &TableMetadataAllocatorContext, regions: usize)
-> Result<Vec<Peer>>;
}
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::metasrv::builder::MetasrvBuilder;
use crate::metasrv::{Metasrv, MetasrvOptions, SelectorRef};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::load_based::LoadBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
use crate::selector::SelectorType;
use crate::service::admin;
use crate::{error, Result};
Expand Down Expand Up @@ -228,6 +229,7 @@ pub async fn metasrv_builder(
let selector = match opts.selector {
SelectorType::LoadBased => Arc::new(LoadBasedSelector::default()) as SelectorRef,
SelectorType::LeaseBased => Arc::new(LeaseBasedSelector) as SelectorRef,
SelectorType::RoundRobin => Arc::new(RoundRobinSelector::default()) as SelectorRef,
};

Ok(MetasrvBuilder::new()
Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ impl MetaPeerClient {
.map(|election| election.is_leader())
.unwrap_or(true)
}

#[cfg(test)]
pub(crate) fn memory_backend(&self) -> ResettableKvBackendRef {
self.in_memory.clone()
}
}

fn to_stat_kv_map(kvs: Vec<KeyValue>) -> Result<HashMap<StatKey, StatValue>> {
Expand Down
3 changes: 3 additions & 0 deletions src/meta-srv/src/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
mod common;
pub mod lease_based;
pub mod load_based;
pub mod round_robin;
mod weight_compute;
mod weighted_choose;

Expand Down Expand Up @@ -61,6 +62,7 @@ pub enum SelectorType {
#[default]
LoadBased,
LeaseBased,
RoundRobin,
}

impl TryFrom<&str> for SelectorType {
Expand All @@ -70,6 +72,7 @@ impl TryFrom<&str> for SelectorType {
match value {
"load_based" | "LoadBased" => Ok(SelectorType::LoadBased),
"lease_based" | "LeaseBased" => Ok(SelectorType::LeaseBased),
"round_robin" | "RoundRobin" => Ok(SelectorType::RoundRobin),
other => error::UnsupportedSelectorTypeSnafu {
selector_type: other,
}
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/selector/lease_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::selector::common::choose_peers;
use crate::selector::weighted_choose::{RandomWeightedChoose, WeightedItem};
use crate::selector::{Namespace, Selector, SelectorOptions};

/// Select all alive datanodes based using a random weighted choose.
pub struct LeaseBasedSelector;

#[async_trait::async_trait]
Expand Down
138 changes: 138 additions & 0 deletions src/meta-srv/src/selector/round_robin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::atomic::AtomicUsize;

use common_meta::peer::Peer;
use snafu::ensure;

use crate::error::{NoEnoughAvailableDatanodeSnafu, Result};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::{Namespace, Selector, SelectorOptions};

/// Round-robin selector that returns the next peer in the list in sequence.
/// Datanodes are ordered by their node_id.
///
/// This selector is useful when you want to distribute the load evenly across
/// all datanodes. But **it's not recommended** to use this selector in serious
/// production environments because it doesn't take into account the load of
/// each datanode.
#[derive(Default)]
pub struct RoundRobinSelector {
counter: AtomicUsize,
}

#[async_trait::async_trait]
impl Selector for RoundRobinSelector {
type Context = SelectorContext;
type Output = Vec<Peer>;

async fn select(
&self,
ns: Namespace,
ctx: &Self::Context,
opts: SelectorOptions,
) -> Result<Vec<Peer>> {
// 1. get alive datanodes.
let lease_kvs =
lease::alive_datanodes(ns, &ctx.meta_peer_client, ctx.datanode_lease_secs).await?;

// 2. map into peers and sort on node id
let mut peers: Vec<Peer> = lease_kvs
.into_iter()
.map(|(k, v)| Peer::new(k.node_id, v.node_addr))
.collect();
peers.sort_by_key(|p| p.id);
ensure!(
!peers.is_empty(),
NoEnoughAvailableDatanodeSnafu {
required: opts.min_required_items,
available: 0usize,
}
);

// 3. choose peers
let mut selected = Vec::with_capacity(opts.min_required_items);
for _ in 0..opts.min_required_items {
let idx = self
.counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% peers.len();
selected.push(peers[idx].clone());
}

Ok(selected)
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::test_util::{create_selector_context, put_datanodes};

#[tokio::test]
async fn test_round_robin_selector() {
let selector = RoundRobinSelector::default();
let ctx = create_selector_context();
let ns = 0;

// add three nodes
let peer1 = Peer {
id: 2,
addr: "node1".to_string(),
};
let peer2 = Peer {
id: 5,
addr: "node2".to_string(),
};
let peer3 = Peer {
id: 8,
addr: "node3".to_string(),
};
let peers = vec![peer1.clone(), peer2.clone(), peer3.clone()];
put_datanodes(ns, &ctx.meta_peer_client, peers).await;

let peers = selector
.select(
ns,
&ctx,
SelectorOptions {
min_required_items: 4,
allow_duplication: true,
},
)
.await
.unwrap();
assert_eq!(peers.len(), 4);
assert_eq!(
peers,
vec![peer1.clone(), peer2.clone(), peer3.clone(), peer1.clone()]
);

let peers = selector
.select(
ns,
&ctx,
SelectorOptions {
min_required_items: 2,
allow_duplication: true,
},
)
.await
.unwrap();
assert_eq!(peers.len(), 2);
assert_eq!(peers, vec![peer2.clone(), peer3.clone()]);
}
}
8 changes: 8 additions & 0 deletions src/meta-srv/src/table_meta_alloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,18 @@ pub struct MetasrvPeerAllocator {
}

impl MetasrvPeerAllocator {
/// Creates a new [`MetasrvPeerAllocator`] with the given [`SelectorContext`] and [`SelectorRef`].
pub fn new(ctx: SelectorContext, selector: SelectorRef) -> Self {
Self { ctx, selector }
}

/// Allocates a specified number (by `regions`) of [`Peer`] instances based on the given
/// [`TableMetadataAllocatorContext`] and number of regions. The returned peers will have
/// the same length as the number of regions.
///
/// This method is mainly a wrapper around the [`SelectorRef`]::`select` method. There is
/// no guarantee that how the returned peers are used, like whether they are from the same
/// table or not. So this method isn't idempotent.
async fn alloc(
&self,
ctx: &TableMetadataAllocatorContext,
Expand Down
68 changes: 52 additions & 16 deletions src/meta-srv/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ use common_meta::peer::Peer;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::sequence::SequenceBuilder;
use common_meta::state_store::KvStateStore;
use common_meta::ClusterId;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_time::util as time_util;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;

use crate::cluster::MetaPeerClientBuilder;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::handler::{HeartbeatMailbox, Pushers};
use crate::keys::{LeaseKey, LeaseValue};
use crate::lock::memory::MemLock;
use crate::metasrv::SelectorContext;
use crate::procedure::region_failover::RegionFailoverManager;
Expand All @@ -54,17 +57,9 @@ pub(crate) fn new_region_route(region_id: u64, peers: &[Peer], leader_node: u64)
}
}

pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
let kv_backend = Arc::new(MemoryKvBackend::new());

let pushers = Pushers::default();
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence);

let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));

/// Builds and returns a [`SelectorContext`]. To access its inner state,
/// use `memory_backend` on [`MetaPeerClientRef`].
pub(crate) fn create_selector_context() -> SelectorContext {
let in_memory = Arc::new(MemoryKvBackend::new());
let meta_peer_client = MetaPeerClientBuilder::default()
.election(None)
Expand All @@ -74,15 +69,30 @@ pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
// Safety: all required fields set at initialization
.unwrap();

let selector = Arc::new(LeaseBasedSelector);
let selector_ctx = SelectorContext {
SelectorContext {
datanode_lease_secs: 10,
server_addr: "127.0.0.1:3002".to_string(),
kv_backend: kv_backend.clone(),
kv_backend: in_memory,
meta_peer_client,
table_id: None,
};
}
}

pub(crate) fn create_region_failover_manager() -> Arc<RegionFailoverManager> {
let kv_backend = Arc::new(MemoryKvBackend::new());

let pushers = Pushers::default();
let mailbox_sequence =
SequenceBuilder::new("test_heartbeat_mailbox", kv_backend.clone()).build();
let mailbox = HeartbeatMailbox::create(pushers, mailbox_sequence);

let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(ManagerConfig::default(), state_store));

let selector = Arc::new(LeaseBasedSelector);
let selector_ctx = create_selector_context();

let in_memory = Arc::new(MemoryKvBackend::new());
Arc::new(RegionFailoverManager::new(
10,
in_memory,
Expand Down Expand Up @@ -157,3 +167,29 @@ pub(crate) async fn prepare_table_region_and_info_value(
.await
.unwrap();
}

pub(crate) async fn put_datanodes(
cluster_id: ClusterId,
meta_peer_client: &MetaPeerClientRef,
datanodes: Vec<Peer>,
) {
let backend = meta_peer_client.memory_backend();
for datanode in datanodes {
let lease_key = LeaseKey {
cluster_id,
node_id: datanode.id,
};
let lease_value = LeaseValue {
timestamp_millis: time_util::current_time_millis(),
node_addr: datanode.addr,
};
let lease_key_bytes: Vec<u8> = lease_key.try_into().unwrap();
let lease_value_bytes: Vec<u8> = lease_value.try_into().unwrap();
let put_request = common_meta::rpc::store::PutRequest {
key: lease_key_bytes,
value: lease_value_bytes,
..Default::default()
};
backend.put(put_request).await.unwrap();
}
}

0 comments on commit a58256d

Please sign in to comment.