Skip to content

Commit

Permalink
feat: abort region migration if leader region peer is unexpected (#3086)
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu authored Jan 3, 2024
1 parent 5c66ce6 commit 21694c2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
62 changes: 60 additions & 2 deletions src/meta-srv/src/procedure/region_migration/migration_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use super::migration_abort::RegionMigrationAbort;
use super::migration_end::RegionMigrationEnd;
use super::open_candidate_region::OpenCandidateRegion;
use super::update_metadata::UpdateMetadata;
Expand Down Expand Up @@ -51,9 +52,18 @@ impl State for RegionMigrationStart {
let region_id = ctx.persistent_ctx.region_id;
let region_route = self.retrieve_region_route(ctx, region_id).await?;
let to_peer = &ctx.persistent_ctx.to_peer;
let from_peer = &ctx.persistent_ctx.from_peer;

if self.has_migrated(&region_route, to_peer)? {
Ok((Box::new(RegionMigrationEnd), Status::Done))
} else if self.invalid_leader_peer(&region_route, from_peer)? {
Ok((
Box::new(RegionMigrationAbort::new(&format!(
"Invalid region leader peer: {from_peer:?}, expected: {:?}",
region_route.leader_peer.as_ref().unwrap(),
))),
Status::Done,
))
} else if self.check_candidate_region_on_peer(&region_route, to_peer) {
Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true)))
} else {
Expand Down Expand Up @@ -112,6 +122,24 @@ impl RegionMigrationStart {
region_opened
}

/// Returns true if the region leader is not the `from_peer`.
///
/// Abort(non-retry):
/// - Leader peer of RegionRoute is not found.
fn invalid_leader_peer(&self, region_route: &RegionRoute, from_peer: &Peer) -> Result<bool> {
let region_id = region_route.region.id;

let is_invalid_leader_peer = region_route
.leader_peer
.as_ref()
.context(error::UnexpectedSnafu {
violated: format!("Leader peer is not found in TableRoute({})", region_id),
})?
.id
!= from_peer.id;
Ok(is_invalid_leader_peer)
}

/// Checks whether the region has been migrated.
/// Returns true if it's.
///
Expand Down Expand Up @@ -203,6 +231,7 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let to_peer = persistent_context.to_peer.clone();
let region_id = persistent_context.region_id;

Expand All @@ -212,7 +241,7 @@ mod tests {
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(3)),
leader_peer: Some(Peer::empty(from_peer_id)),
follower_peers: vec![to_peer],
..Default::default()
}];
Expand Down Expand Up @@ -262,14 +291,15 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);

let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(3)),
leader_peer: Some(Peer::empty(from_peer_id)),
..Default::default()
}];

Expand All @@ -280,4 +310,32 @@ mod tests {

let _ = next.as_any().downcast_ref::<OpenCandidateRegion>().unwrap();
}

#[tokio::test]
async fn test_next_migration_abort() {
let mut state = Box::new(RegionMigrationStart);
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let region_id = persistent_context.region_id;
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);

let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(region_id),
leader_peer: Some(Peer::empty(1024)),
..Default::default()
}];

env.create_physical_table_metadata(table_info, region_routes)
.await;

let (next, _) = state.next(&mut ctx).await.unwrap();

let _ = next
.as_any()
.downcast_ref::<RegionMigrationAbort>()
.unwrap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ mod tests {
// from_peer: 1
// to_peer: 2
let persistent_context = new_persistent_context();
let from_peer_id = persistent_context.from_peer.id;
let region_id = persistent_context.region_id;
let to_peer_id = persistent_context.to_peer.id;
let mut env = TestingEnv::new();
Expand All @@ -400,7 +401,7 @@ mod tests {
let table_info = new_test_table_info(1024, vec![1]).into();
let region_routes = vec![RegionRoute {
region: Region::new_test(persistent_context.region_id),
leader_peer: Some(Peer::empty(1)),
leader_peer: Some(Peer::empty(from_peer_id)),
..Default::default()
}];

Expand Down

0 comments on commit 21694c2

Please sign in to comment.