diff --git a/src/meta-srv/src/selector/load_based.rs b/src/meta-srv/src/selector/load_based.rs index 72306bd9ed48..9e34ca0563ae 100644 --- a/src/meta-srv/src/selector/load_based.rs +++ b/src/meta-srv/src/selector/load_based.rs @@ -13,18 +13,53 @@ // limitations under the License. use api::v1::meta::Peer; +use common_meta::key::table_name::TableNameKey; +use common_meta::key::TableMetadataManager; +use common_meta::rpc::router::find_leaders; use common_telemetry::warn; +use snafu::ResultExt; -use crate::error::Result; +use crate::error::{self, Result}; use crate::keys::{LeaseKey, LeaseValue, StatKey}; use crate::lease; use crate::metasrv::SelectorContext; use crate::selector::{Namespace, Selector}; +use crate::service::store::kv::KvBackendAdapter; const MAX_REGION_NUMBER: u64 = u64::MAX; pub struct LoadBasedSelector; +async fn get_leader_peer_ids( + table_metadata_manager: &TableMetadataManager, + catalog: &str, + schema: &str, + table: &str, +) -> Result> { + let table_name = table_metadata_manager + .table_name_manager() + .get(TableNameKey::new(catalog, schema, table)) + .await + .context(error::TableMetadataManagerSnafu)?; + + Ok(if let Some(table_name) = table_name { + table_metadata_manager + .table_route_manager() + .get(table_name.table_id()) + .await + .context(error::TableMetadataManagerSnafu)? + .map(|route| { + find_leaders(&route.region_routes) + .into_iter() + .map(|peer| peer.id) + .collect() + }) + .unwrap_or_default() + } else { + Vec::new() + }) +} + #[async_trait::async_trait] impl Selector for LoadBasedSelector { type Context = SelectorContext; @@ -41,11 +76,22 @@ impl Selector for LoadBasedSelector { let stat_keys: Vec = lease_kvs.keys().map(|k| k.into()).collect(); let stat_kvs = ctx.meta_peer_client.get_dn_stat_kvs(stat_keys).await?; + let leader_peer_ids = if let (Some(catalog), Some(schema), Some(table)) = + (&ctx.catalog, &ctx.schema, &ctx.table) + { + let table_metadata_manager = + TableMetadataManager::new(KvBackendAdapter::wrap(ctx.kv_store.clone())); + + get_leader_peer_ids(&table_metadata_manager, catalog, schema, table).await? + } else { + Vec::new() + }; + let mut tuples: Vec<(LeaseKey, LeaseValue, u64)> = lease_kvs .into_iter() + .filter(|(lease_k, _)| !leader_peer_ids.contains(&lease_k.node_id)) .map(|(lease_k, lease_v)| { let stat_key: StatKey = (&lease_k).into(); - let region_num = match stat_kvs .get(&stat_key) .and_then(|stat_val| stat_val.region_num())