Skip to content

Commit

Permalink
fix: LoadBase Selector cannot follow the region distribution rules (#…
Browse files Browse the repository at this point in the history
…2259)

* fix: LoadBase Selector cannot follow the region distribution rules

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored and waynexia committed Sep 12, 2023
1 parent 6d843f8 commit 58091d3
Showing 1 changed file with 48 additions and 2 deletions.
50 changes: 48 additions & 2 deletions src/meta-srv/src/selector/load_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,53 @@
// limitations under the License.

use api::v1::meta::Peer;
use common_meta::key::table_name::TableNameKey;
use common_meta::key::TableMetadataManager;
use common_meta::rpc::router::find_leaders;
use common_telemetry::warn;
use snafu::ResultExt;

use crate::error::Result;
use crate::error::{self, Result};
use crate::keys::{LeaseKey, LeaseValue, StatKey};
use crate::lease;
use crate::metasrv::SelectorContext;
use crate::selector::{Namespace, Selector};
use crate::service::store::kv::KvBackendAdapter;

const MAX_REGION_NUMBER: u64 = u64::MAX;

pub struct LoadBasedSelector;

async fn get_leader_peer_ids(
table_metadata_manager: &TableMetadataManager,
catalog: &str,
schema: &str,
table: &str,
) -> Result<Vec<u64>> {
let table_name = table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(catalog, schema, table))
.await
.context(error::TableMetadataManagerSnafu)?;

Ok(if let Some(table_name) = table_name {
table_metadata_manager
.table_route_manager()
.get(table_name.table_id())
.await
.context(error::TableMetadataManagerSnafu)?
.map(|route| {
find_leaders(&route.region_routes)
.into_iter()
.map(|peer| peer.id)
.collect()
})
.unwrap_or_default()
} else {
Vec::new()
})
}

#[async_trait::async_trait]
impl Selector for LoadBasedSelector {
type Context = SelectorContext;
Expand All @@ -41,11 +76,22 @@ impl Selector for LoadBasedSelector {
let stat_keys: Vec<StatKey> = lease_kvs.keys().map(|k| k.into()).collect();
let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?;

let leader_peer_ids = if let (Some(catalog), Some(schema), Some(table)) =
(&ctx.catalog, &ctx.schema, &ctx.table)
{
let table_metadata_manager =
TableMetadataManager::new(KvBackendAdapter::wrap(ctx.kv_store.clone()));

get_leader_peer_ids(&table_metadata_manager, catalog, schema, table).await?
} else {
Vec::new()
};

let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs
.into_iter()
.filter(|(lease_k, _)| !leader_peer_ids.contains(&lease_k.node_id))
.map(|(lease_k, lease_v)| {
let stat_key: StatKey = (&lease_k).into();

let region_num = match stat_kvs
.get(&stat_key)
.and_then(|stat_val| stat_val.region_num())
Expand Down

0 comments on commit 58091d3

Please sign in to comment.