Skip to content

Commit

Permalink
fix: register regions during procedure recovery (#3859)
Browse files Browse the repository at this point in the history
* fix: register regions during procedure recovery

* feat: add `recover` to `Procedure` trait

* refactor: move recovery to `recover` method
  • Loading branch information
WenyXu authored May 7, 2024
1 parent c0be0c3 commit 1b58622
Show file tree
Hide file tree
Showing 9 changed files with 361 additions and 19 deletions.
34 changes: 20 additions & 14 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,13 @@ impl CreateTableProcedure {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data = serde_json::from_str(json).context(FromJsonSnafu)?;

let mut creator = TableCreator {
data,
opening_regions: vec![],
};

// Only registers regions if the table route is allocated.
if let Some(x) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(CreateTableProcedure { context, creator })
Ok(CreateTableProcedure {
context,
creator: TableCreator {
data,
opening_regions: vec![],
},
})
}

fn table_info(&self) -> &RawTableInfo {
Expand Down Expand Up @@ -295,6 +288,19 @@ impl Procedure for CreateTableProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the table route is allocated.
if let Some(x) = &self.creator.data.table_route {
self.creator.opening_regions = self
.creator
.register_opening_regions(&self.context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(())
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.creator.data.state;

Expand Down
20 changes: 19 additions & 1 deletion src/common/meta/src/ddl/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ pub mod start;
use std::any::Any;
use std::fmt::Debug;

use common_procedure::error::{Error as ProcedureError, FromJsonSnafu, ToJsonSnafu};
use common_error::ext::BoxedError;
use common_procedure::error::{Error as ProcedureError, ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status,
};
Expand Down Expand Up @@ -68,6 +69,11 @@ pub(crate) trait State: Send + Debug {
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)>;

/// The hook is called during the recovery.
fn recover(&mut self, _ddl_ctx: &DdlContext) -> Result<()> {
Ok(())
}

/// Returns as [Any](std::any::Any).
fn as_any(&self) -> &dyn Any;
}
Expand Down Expand Up @@ -107,6 +113,11 @@ impl DropDatabaseProcedure {
state,
})
}

#[cfg(test)]
pub(crate) fn state(&self) -> &dyn State {
self.state.as_ref()
}
}

#[async_trait]
Expand All @@ -115,6 +126,13 @@ impl Procedure for DropDatabaseProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
self.state
.recover(&self.runtime_context)
.map_err(BoxedError::new)
.context(ExternalSnafu)
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &mut self.state;

Expand Down
44 changes: 43 additions & 1 deletion src/common/meta/src/ddl/drop_database/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ impl DropDatabaseExecutor {
}

impl DropDatabaseExecutor {
fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
/// Registers the operating regions.
pub(crate) fn register_dropping_regions(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
if !self.dropping_regions.is_empty() {
return Ok(());
}
let dropping_regions = operating_leader_regions(&self.physical_region_routes);
let mut dropping_region_guards = Vec::with_capacity(dropping_regions.len());
for (region_id, datanode_id) in dropping_regions {
Expand All @@ -85,6 +89,10 @@ impl DropDatabaseExecutor {
#[async_trait::async_trait]
#[typetag::serde]
impl State for DropDatabaseExecutor {
fn recover(&mut self, ddl_ctx: &DdlContext) -> Result<()> {
self.register_dropping_regions(ddl_ctx)
}

async fn next(
&mut self,
ddl_ctx: &DdlContext,
Expand Down Expand Up @@ -338,4 +346,38 @@ mod tests {
let err = state.next(&ddl_context, &mut ctx).await.unwrap_err();
assert!(err.is_retry_later());
}

#[tokio::test]
async fn test_on_recovery() {
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager);
let physical_table_id = create_physical_table(&ddl_context, 0, "phy").await;
let (_, table_route) = ddl_context
.table_metadata_manager
.table_route_manager()
.get_physical_table_route(physical_table_id)
.await
.unwrap();
{
let mut state = DropDatabaseExecutor::new(
physical_table_id,
physical_table_id,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "phy"),
table_route.region_routes.clone(),
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
tables: None,
};
state.recover(&ddl_context).unwrap();
assert_eq!(state.dropping_regions.len(), 1);
let (state, status) = state.next(&ddl_context, &mut ctx).await.unwrap();
assert!(!status.need_persist());
let cursor = state.as_any().downcast_ref::<DropDatabaseCursor>().unwrap();
assert_eq!(cursor.target, DropTableTarget::Physical);
}
}
}
21 changes: 20 additions & 1 deletion src/common/meta/src/ddl/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub(crate) mod executor;
mod metadata;

use async_trait::async_trait;
use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
use common_error::ext::BoxedError;
use common_procedure::error::{ExternalSnafu, FromJsonSnafu, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
Result as ProcedureResult, Status,
Expand Down Expand Up @@ -68,6 +69,7 @@ impl DropTableProcedure {
pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult<Self> {
let data: DropTableData = serde_json::from_str(json).context(FromJsonSnafu)?;
let executor = data.build_executor();

Ok(Self {
context,
data,
Expand Down Expand Up @@ -175,6 +177,23 @@ impl Procedure for DropTableProcedure {
Self::TYPE_NAME
}

fn recover(&mut self) -> ProcedureResult<()> {
// Only registers regions if the metadata is deleted.
let register_operating_regions = matches!(
self.data.state,
DropTableState::DeleteMetadata
| DropTableState::InvalidateTableCache
| DropTableState::DatanodeDropRegions
);
if register_operating_regions {
self.register_dropping_regions()
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(())
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let state = &self.data.state;
let _timer = metrics::METRIC_META_PROCEDURE_DROP_TABLE
Expand Down
79 changes: 78 additions & 1 deletion src/common/meta/src/ddl/tests/drop_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ use std::sync::Arc;

use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId};
use common_procedure_test::MockContextProvider;
use common_procedure_test::{
execute_procedure_until, execute_procedure_until_done, MockContextProvider,
};
use futures::TryStreamExt;

use crate::ddl::drop_database::executor::DropDatabaseExecutor;
use crate::ddl::drop_database::DropDatabaseProcedure;
use crate::ddl::test_util::datanode_handler::{NaiveDatanodeHandler, RetryErrorDatanodeHandler};
use crate::ddl::test_util::{create_logical_table, create_physical_table};
Expand Down Expand Up @@ -121,3 +124,77 @@ async fn test_drop_database_retryable_error() {
}
}
}

#[tokio::test]
async fn test_drop_database_recover() {
common_telemetry::init_default_ut_logging();
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let ddl_context = new_ddl_context(node_manager);
ddl_context
.table_metadata_manager
.schema_manager()
.create(
SchemaNameKey::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME),
None,
false,
)
.await
.unwrap();
// Creates a physical table
let phy_id = create_physical_table(&ddl_context, cluster_id, "phy").await;
// Creates a logical tables
create_logical_table(ddl_context.clone(), cluster_id, phy_id, "table1").await;
let mut procedure = DropDatabaseProcedure::new(
DEFAULT_CATALOG_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
false,
ddl_context.clone(),
);
let num_operating_regions = 1;
// Before dropping the logical table
execute_procedure_until(&mut procedure, |p| {
p.state()
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.is_some()
})
.await;
// Dump data
let data = procedure.dump().unwrap();
assert_eq!(ddl_context.memory_region_keeper.len(), 0);
let mut procedure = DropDatabaseProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
ddl_context.memory_region_keeper.clear();
// Before dropping the physical table
execute_procedure_until(&mut procedure, |p| {
p.state()
.as_any()
.downcast_ref::<DropDatabaseExecutor>()
.is_some()
})
.await;
// Dump data
let data = procedure.dump().unwrap();
assert_eq!(ddl_context.memory_region_keeper.len(), 0);
let mut procedure = DropDatabaseProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
ddl_context.memory_region_keeper.clear();
execute_procedure_until_done(&mut procedure).await;
let tables = ddl_context
.table_metadata_manager
.table_name_manager()
.tables(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.try_collect::<Vec<_>>()
.await
.unwrap();
assert!(tables.is_empty());
}
66 changes: 66 additions & 0 deletions src/common/meta/src/ddl/tests/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,3 +296,69 @@ async fn test_memory_region_keeper_guard_dropped_on_procedure_done() {
inner_test(new_drop_table_task("s", logical_table_id, false)).await;
inner_test(new_drop_table_task("t", physical_table_id, false)).await;
}

#[tokio::test]
async fn test_from_json() {
for (state, num_operating_regions, num_operating_regions_after_recovery) in [
(DropTableState::DeleteMetadata, 0, 1),
(DropTableState::InvalidateTableCache, 1, 1),
(DropTableState::DatanodeDropRegions, 1, 1),
(DropTableState::DeleteTombstone, 1, 0),
] {
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let kv_backend = Arc::new(MemoryKvBackend::new());
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);

let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await;
let task = new_drop_table_task("t", physical_table_id, false);
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
execute_procedure_until(&mut procedure, |p| p.data.state == state).await;
let data = procedure.dump().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
// Cleans up the keeper.
ddl_context.memory_region_keeper.clear();
let mut procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions_after_recovery
);
assert_eq!(
procedure.dropping_regions.len(),
num_operating_regions_after_recovery
);
}

let num_operating_regions = 0;
let num_operating_regions_after_recovery = 0;
let cluster_id = 1;
let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
let kv_backend = Arc::new(MemoryKvBackend::new());
let ddl_context = new_ddl_context_with_kv_backend(node_manager, kv_backend);

let physical_table_id = create_physical_table(&ddl_context, cluster_id, "t").await;
let task = new_drop_table_task("t", physical_table_id, false);
let mut procedure = DropTableProcedure::new(cluster_id, task, ddl_context.clone());
execute_procedure_until_done(&mut procedure).await;
let data = procedure.dump().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions
);
// Cleans up the keeper.
ddl_context.memory_region_keeper.clear();
let mut procedure = DropTableProcedure::from_json(&data, ddl_context.clone()).unwrap();
procedure.recover().unwrap();
assert_eq!(
ddl_context.memory_region_keeper.len(),
num_operating_regions_after_recovery
);
assert_eq!(
procedure.dropping_regions.len(),
num_operating_regions_after_recovery
);
}
7 changes: 7 additions & 0 deletions src/common/meta/src/region_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,16 @@ impl MemoryRegionKeeper {
inner.len()
}

/// Returns true if it's empty.
pub fn is_empty(&self) -> bool {
self.len() == 0
}

#[cfg(test)]
pub fn clear(&self) {
let mut inner = self.inner.write().unwrap();
inner.clear();
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 1b58622

Please sign in to comment.