diff --git a/astacus/coordinator/coordinator.py b/astacus/coordinator/coordinator.py index a5a48dcc..71ddda9a 100644 --- a/astacus/coordinator/coordinator.py +++ b/astacus/coordinator/coordinator.py @@ -271,8 +271,9 @@ async def try_run(self, cluster: Cluster, context: StepsContext) -> bool: with self._progress_handler(cluster, step): try: r = await step.run_step(cluster, context) - except (StepFailedError, WaitResultError) as e: - logger.info("Step %s failed: %s", step, str(e)) + except (StepFailedError, WaitResultError) as exc: + logger.info("Step %s failed: %s", step, str(exc)) + await step.handle_step_failure(cluster, context, exc) return False context.set_result(step.__class__, r) return True diff --git a/astacus/coordinator/plugins/base.py b/astacus/coordinator/plugins/base.py index a2d5995f..8d9207e2 100644 --- a/astacus/coordinator/plugins/base.py +++ b/astacus/coordinator/plugins/base.py @@ -71,14 +71,24 @@ class Step(Generic[StepResult_co]): async def run_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co: raise NotImplementedError + async def handle_step_failure(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None: + # This method should not raise exceptions + return None + class SyncStep(Step[StepResult_co]): async def run_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co: return await run_in_threadpool(self.run_sync_step, cluster, context) + async def handle_step_failure(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None: + await run_in_threadpool(self.handle_step_failure_sync, cluster, context, exc) + def run_sync_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co: raise NotImplementedError + def handle_step_failure_sync(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None: + return None + class StepFailedError(exceptions.PermanentException): pass diff --git a/astacus/coordinator/plugins/clickhouse/plugin.py b/astacus/coordinator/plugins/clickhouse/plugin.py index feccc228..c8f38d64 100644 --- a/astacus/coordinator/plugins/clickhouse/plugin.py +++ b/astacus/coordinator/plugins/clickhouse/plugin.py @@ -134,6 +134,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: RetrieveKeeperMapTableDataStep( zookeeper_client=zookeeper_client, keeper_map_path_prefix=self.keeper_map_path_prefix, + clients=clickhouse_clients, ), # Then freeze all tables FreezeTablesStep( diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index ee29dcf2..1bf6119a 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -242,6 +242,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext): class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]): zookeeper_client: ZooKeeperClient keeper_map_path_prefix: str | None + clients: Sequence[ClickHouseClient] async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[KeeperMapTable]: if self.keeper_map_path_prefix is None: @@ -275,6 +276,12 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke raise TransientException("Concurrent table addition / deletion during KeeperMap backup") return tables + async def handle_step_failure(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None: + try: + await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context) + except StepFailedError: + logger.warning("Unable to restore write ACLs for KeeperMap tables") + @dataclasses.dataclass class RetrieveDatabasesAndTablesStep(Step[DatabasesAndTables]): @@ -500,6 +507,12 @@ class FreezeTablesStep(FreezeUnfreezeTablesStepBase): def operation(self) -> str: return "FREEZE" + async def handle_step_failure(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None: + try: + await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context) + except StepFailedError: + logger.warning("Unable to restore write ACLs for KeeperMap tables") + @dataclasses.dataclass class UnfreezeTablesStep(FreezeUnfreezeTablesStepBase): diff --git a/tests/unit/coordinator/plugins/clickhouse/test_steps.py b/tests/unit/coordinator/plugins/clickhouse/test_steps.py index e13b4833..19bb19cb 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_steps.py @@ -283,10 +283,12 @@ async def create_zookeeper_keeper_map_table_data(zookeeper_client: ZooKeeperClie async def test_retrieve_keeper_map_table_data() -> None: zookeeper_client = FakeZooKeeperClient() + clickhouse_client = mock_clickhouse_client() await create_zookeeper_keeper_map_table_data(zookeeper_client) step = RetrieveKeeperMapTableDataStep( zookeeper_client=zookeeper_client, keeper_map_path_prefix="/clickhouse/keeper_map/", + clients=[clickhouse_client], ) keeper_map_data = await step.run_step(Cluster(nodes=[]), StepsContext()) assert keeper_map_data == SAMPLE_KEEPER_MAP_TABLES