Skip to content

Commit

Permalink
Handle step failure by running optional exception handling function
Browse files Browse the repository at this point in the history
  • Loading branch information
aris-aiven committed Oct 31, 2024
1 parent d51a48b commit 1f613c2
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 2 deletions.
5 changes: 3 additions & 2 deletions astacus/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ async def try_run(self, cluster: Cluster, context: StepsContext) -> bool:
with self._progress_handler(cluster, step):
try:
r = await step.run_step(cluster, context)
except (StepFailedError, WaitResultError) as e:
logger.info("Step %s failed: %s", step, str(e))
except (StepFailedError, WaitResultError) as exc:
logger.info("Step %s failed: %s", step, str(exc))
await step.handle_step_failure(cluster, context, exc)
return False
context.set_result(step.__class__, r)
return True
Expand Down
10 changes: 10 additions & 0 deletions astacus/coordinator/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,24 @@ class Step(Generic[StepResult_co]):
async def run_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co:
raise NotImplementedError

async def handle_step_failure(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None:
# This method should not raise exceptions
return None


class SyncStep(Step[StepResult_co]):
async def run_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co:
return await run_in_threadpool(self.run_sync_step, cluster, context)

async def handle_step_failure(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None:
await run_in_threadpool(self.handle_step_failure_sync, cluster, context, exc)

def run_sync_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co:
raise NotImplementedError

def handle_step_failure_sync(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None:
return None


class StepFailedError(exceptions.PermanentException):
pass
Expand Down
1 change: 1 addition & 0 deletions astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
RetrieveKeeperMapTableDataStep(
zookeeper_client=zookeeper_client,
keeper_map_path_prefix=self.keeper_map_path_prefix,
clients=clickhouse_clients,
),
# Then freeze all tables
FreezeTablesStep(
Expand Down
13 changes: 13 additions & 0 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext):
class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]):
zookeeper_client: ZooKeeperClient
keeper_map_path_prefix: str | None
clients: Sequence[ClickHouseClient]

async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[KeeperMapTable]:
if self.keeper_map_path_prefix is None:
Expand Down Expand Up @@ -275,6 +276,12 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke
raise TransientException("Concurrent table addition / deletion during KeeperMap backup")
return tables

async def handle_step_failure(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None:
try:
await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context)
except StepFailedError:
logger.warning("Unable to restore write ACLs for KeeperMap tables")


@dataclasses.dataclass
class RetrieveDatabasesAndTablesStep(Step[DatabasesAndTables]):
Expand Down Expand Up @@ -500,6 +507,12 @@ class FreezeTablesStep(FreezeUnfreezeTablesStepBase):
def operation(self) -> str:
return "FREEZE"

async def handle_step_failure(self, cluster: Cluster, context: StepsContext, exc: Exception) -> None:
try:
await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context)
except StepFailedError:
logger.warning("Unable to restore write ACLs for KeeperMap tables")


@dataclasses.dataclass
class UnfreezeTablesStep(FreezeUnfreezeTablesStepBase):
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,12 @@ async def create_zookeeper_keeper_map_table_data(zookeeper_client: ZooKeeperClie

async def test_retrieve_keeper_map_table_data() -> None:
zookeeper_client = FakeZooKeeperClient()
clickhouse_client = mock_clickhouse_client()
await create_zookeeper_keeper_map_table_data(zookeeper_client)
step = RetrieveKeeperMapTableDataStep(
zookeeper_client=zookeeper_client,
keeper_map_path_prefix="/clickhouse/keeper_map/",
clients=[clickhouse_client],
)
keeper_map_data = await step.run_step(Cluster(nodes=[]), StepsContext())
assert keeper_map_data == SAMPLE_KEEPER_MAP_TABLES
Expand Down

0 comments on commit 1f613c2

Please sign in to comment.