diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 1a4d8349fd4a..048a1a4cbb04 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -21,6 +21,7 @@ use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; +use super::migration_abort::RegionMigrationAbort; use super::migration_end::RegionMigrationEnd; use super::open_candidate_region::OpenCandidateRegion; use super::update_metadata::UpdateMetadata; @@ -51,9 +52,18 @@ impl State for RegionMigrationStart { let region_id = ctx.persistent_ctx.region_id; let region_route = self.retrieve_region_route(ctx, region_id).await?; let to_peer = &ctx.persistent_ctx.to_peer; + let from_peer = &ctx.persistent_ctx.from_peer; if self.has_migrated(®ion_route, to_peer)? { Ok((Box::new(RegionMigrationEnd), Status::Done)) + } else if self.invalid_leader_peer(®ion_route, from_peer)? { + Ok(( + Box::new(RegionMigrationAbort::new(&format!( + "Invalid region leader peer: {from_peer:?}, expected: {:?}", + region_route.leader_peer.as_ref().unwrap(), + ))), + Status::Done, + )) } else if self.check_candidate_region_on_peer(®ion_route, to_peer) { Ok((Box::new(UpdateMetadata::Downgrade), Status::executing(true))) } else { @@ -112,6 +122,24 @@ impl RegionMigrationStart { region_opened } + /// Returns true if the region leader is not the `from_peer`. + /// + /// Abort(non-retry): + /// - Leader peer of RegionRoute is not found. + fn invalid_leader_peer(&self, region_route: &RegionRoute, from_peer: &Peer) -> Result { + let region_id = region_route.region.id; + + let is_invalid_leader_peer = region_route + .leader_peer + .as_ref() + .context(error::UnexpectedSnafu { + violated: format!("Leader peer is not found in TableRoute({})", region_id), + })? + .id + != from_peer.id; + Ok(is_invalid_leader_peer) + } + /// Checks whether the region has been migrated. /// Returns true if it's. /// @@ -203,6 +231,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; let to_peer = persistent_context.to_peer.clone(); let region_id = persistent_context.region_id; @@ -212,7 +241,7 @@ mod tests { let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), - leader_peer: Some(Peer::empty(3)), + leader_peer: Some(Peer::empty(from_peer_id)), follower_peers: vec![to_peer], ..Default::default() }]; @@ -262,6 +291,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; let region_id = persistent_context.region_id; let env = TestingEnv::new(); let mut ctx = env.context_factory().new_context(persistent_context); @@ -269,7 +299,7 @@ mod tests { let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { region: Region::new_test(region_id), - leader_peer: Some(Peer::empty(3)), + leader_peer: Some(Peer::empty(from_peer_id)), ..Default::default() }]; @@ -280,4 +310,32 @@ mod tests { let _ = next.as_any().downcast_ref::().unwrap(); } + + #[tokio::test] + async fn test_next_migration_abort() { + let mut state = Box::new(RegionMigrationStart); + // from_peer: 1 + // to_peer: 2 + let persistent_context = new_persistent_context(); + let region_id = persistent_context.region_id; + let env = TestingEnv::new(); + let mut ctx = env.context_factory().new_context(persistent_context); + + let table_info = new_test_table_info(1024, vec![1]).into(); + let region_routes = vec![RegionRoute { + region: Region::new_test(region_id), + leader_peer: Some(Peer::empty(1024)), + ..Default::default() + }]; + + env.create_physical_table_metadata(table_info, region_routes) + .await; + + let (next, _) = state.next(&mut ctx).await.unwrap(); + + let _ = next + .as_any() + .downcast_ref::() + .unwrap(); + } } diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 176db54952b3..0335a447c7e7 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -392,6 +392,7 @@ mod tests { // from_peer: 1 // to_peer: 2 let persistent_context = new_persistent_context(); + let from_peer_id = persistent_context.from_peer.id; let region_id = persistent_context.region_id; let to_peer_id = persistent_context.to_peer.id; let mut env = TestingEnv::new(); @@ -400,7 +401,7 @@ mod tests { let table_info = new_test_table_info(1024, vec![1]).into(); let region_routes = vec![RegionRoute { region: Region::new_test(persistent_context.region_id), - leader_peer: Some(Peer::empty(1)), + leader_peer: Some(Peer::empty(from_peer_id)), ..Default::default() }];