diff --git a/astacus/coordinator/plugins/base.py b/astacus/coordinator/plugins/base.py index 34953dd2..d7d6523d 100644 --- a/astacus/coordinator/plugins/base.py +++ b/astacus/coordinator/plugins/base.py @@ -49,7 +49,10 @@ def get_cleanup_steps( explicit_delete=explicit_delete, ), DeleteBackupManifestsStep(json_storage=context.json_storage), - DeleteDanglingHexdigestsStep(hexdigest_storage=context.hexdigest_storage), + DeleteDanglingHexdigestsStep( + json_storage=context.json_storage, + hexdigest_storage=context.hexdigest_storage, + ), ] @@ -533,7 +536,18 @@ async def clear_delta( await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) -def _prune_manifests(manifests: List[ipc.BackupManifest], retention: Retention) -> List[ipc.BackupManifest]: +@dataclasses.dataclass +class ManifestMin: + start: datetime.datetime + end: datetime.datetime + filename: str + + @classmethod + def from_manifest(cls, manifest: ipc.BackupManifest) -> ManifestMin: + return cls(start=manifest.start, end=manifest.end, filename=manifest.filename) + + +def _prune_manifests(manifests: List[ManifestMin], retention: Retention) -> List[ManifestMin]: manifests = sorted(manifests, key=lambda m: (m.start, m.end, m.filename), reverse=True) if retention.minimum_backups is not None and retention.minimum_backups >= len(manifests): return manifests @@ -564,7 +578,7 @@ def _prune_manifests(manifests: List[ipc.BackupManifest], retention: Retention) @dataclasses.dataclass -class ComputeKeptBackupsStep(Step[Sequence[ipc.BackupManifest]]): +class ComputeKeptBackupsStep(Step[Sequence[ManifestMin]]): """ Return a list of backup manifests we want to keep, after excluding the explicitly deleted backups and applying the retention rules. @@ -575,32 +589,29 @@ class ComputeKeptBackupsStep(Step[Sequence[ipc.BackupManifest]]): explicit_delete: Sequence[str] retain_deltas: bool = False - async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[ipc.BackupManifest]: + async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[ManifestMin]: kept_manifests = await self.compute_kept_basebackups(context) if self.retain_deltas: kept_manifests += await self.compute_kept_deltas(kept_manifests, context) return kept_manifests - async def compute_kept_basebackups(self, context: StepsContext) -> List[ipc.BackupManifest]: + async def compute_kept_basebackups(self, context: StepsContext) -> List[ManifestMin]: all_backup_names = context.get_result(ListBackupsStep) kept_backup_names = all_backup_names.difference(set(self.explicit_delete)) manifests = [] for backup_name in kept_backup_names: - manifests.append(await download_backup_manifest(self.json_storage, backup_name)) - manifests = _prune_manifests(manifests, self.retention) + manifests.append(ManifestMin.from_manifest(await download_backup_manifest(self.json_storage, backup_name))) - return manifests + return _prune_manifests(manifests, self.retention) - async def compute_kept_deltas( - self, kept_backups: Sequence[ipc.BackupManifest], context: StepsContext - ) -> List[ipc.BackupManifest]: + async def compute_kept_deltas(self, kept_backups: Sequence[ManifestMin], context: StepsContext) -> List[ManifestMin]: if not kept_backups: return [] all_delta_names = context.get_result(ListDeltaBackupsStep) oldest_kept_backup = min(kept_backups, key=lambda b: b.start) - kept_deltas: List[ipc.BackupManifest] = [] + kept_deltas: List[ManifestMin] = [] for delta_name in all_delta_names: - delta_manifest = await download_backup_manifest(self.json_storage, delta_name) + delta_manifest = ManifestMin.from_manifest(await download_backup_manifest(self.json_storage, delta_name)) if delta_manifest.end < oldest_kept_backup.end: continue kept_deltas.append(delta_manifest) @@ -641,17 +652,18 @@ class DeleteDanglingHexdigestsStep(Step[None]): """ hexdigest_storage: AsyncHexDigestStorage + json_storage: AsyncJsonStorage async def run_step(self, cluster: Cluster, context: StepsContext) -> None: kept_manifests = context.get_result(ComputeKeptBackupsStep) logger.info("listing extra hexdigests") - kept_hexdigests: set[str] = set() - for manifest in kept_manifests: + extra_hexdigests = set(await self.hexdigest_storage.list_hexdigests()) + for manifest_min in kept_manifests: + manifest = await download_backup_manifest(self.json_storage, manifest_min.filename) for result in manifest.snapshot_results: assert result.hashes is not None - kept_hexdigests = kept_hexdigests | set(h.hexdigest for h in result.hashes if h.hexdigest) - all_hexdigests = await self.hexdigest_storage.list_hexdigests() - extra_hexdigests = set(all_hexdigests).difference(kept_hexdigests) + for hash_ in result.hashes: + extra_hexdigests.discard(hash_.hexdigest) logger.info("deleting %d hexdigests from object storage", len(extra_hexdigests)) for i, hexdigest in enumerate(extra_hexdigests, 1): # Due to rate limiting, it might be better to not do this in parallel diff --git a/astacus/coordinator/plugins/cassandra/plugin.py b/astacus/coordinator/plugins/cassandra/plugin.py index 90105bfe..815af7df 100644 --- a/astacus/coordinator/plugins/cassandra/plugin.py +++ b/astacus/coordinator/plugins/cassandra/plugin.py @@ -220,5 +220,8 @@ def get_cleanup_steps( retain_deltas=True, ), base.DeleteBackupAndDeltaManifestsStep(json_storage=context.json_storage), - base.DeleteDanglingHexdigestsStep(hexdigest_storage=context.hexdigest_storage), + base.DeleteDanglingHexdigestsStep( + json_storage=context.json_storage, + hexdigest_storage=context.hexdigest_storage, + ), ] diff --git a/astacus/coordinator/plugins/clickhouse/plugin.py b/astacus/coordinator/plugins/clickhouse/plugin.py index 7b01ce1e..1307922e 100644 --- a/astacus/coordinator/plugins/clickhouse/plugin.py +++ b/astacus/coordinator/plugins/clickhouse/plugin.py @@ -218,6 +218,9 @@ def get_cleanup_steps( explicit_delete=explicit_delete, ), DeleteBackupManifestsStep(json_storage=context.json_storage), - DeleteDanglingHexdigestsStep(hexdigest_storage=context.hexdigest_storage), - DeleteDanglingObjectStorageFilesStep(disks=disks), + DeleteDanglingHexdigestsStep( + json_storage=context.json_storage, + hexdigest_storage=context.hexdigest_storage, + ), + DeleteDanglingObjectStorageFilesStep(disks=disks, json_storage=context.json_storage), ] diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index 0d2d414e..62b87ab0 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -23,9 +23,11 @@ from .parts import list_parts_to_attach from .replication import DatabaseReplica, get_databases_replicas, get_shard_and_replica, sync_replicated_database from astacus.common import ipc +from astacus.common.asyncstorage import AsyncJsonStorage from astacus.common.exceptions import TransientException from astacus.common.limiter import gather_limited from astacus.coordinator.cluster import Cluster +from astacus.coordinator.manifest import download_backup_manifest from astacus.coordinator.plugins.base import ( BackupManifestStep, ComputeKeptBackupsStep, @@ -784,6 +786,7 @@ class DeleteDanglingObjectStorageFilesStep(Step[None]): """ disks: Disks + json_storage: AsyncJsonStorage async def run_step(self, cluster: Cluster, context: StepsContext) -> None: backup_manifests = context.get_result(ComputeKeptBackupsStep) @@ -793,14 +796,15 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> None: # than the latest backup, so we don't do anything. return newest_backup_start_time = max((backup_manifest.start for backup_manifest in backup_manifests)) - clickhouse_manifests = [ - ClickHouseManifest.from_plugin_data(backup_manifest.plugin_data) for backup_manifest in backup_manifests - ] + kept_paths: dict[str, set[Path]] = {} - for clickhouse_manifest in clickhouse_manifests: + for manifest_min in backup_manifests: + manifest_data = await download_backup_manifest(self.json_storage, manifest_min.filename) + clickhouse_manifest = ClickHouseManifest.from_plugin_data(manifest_data.plugin_data) for object_storage_files in clickhouse_manifest.object_storage_files: disk_kept_paths = kept_paths.setdefault(object_storage_files.disk_name, set()) disk_kept_paths.update((file.path for file in object_storage_files.files)) + for disk_name, disk_kept_paths in sorted(kept_paths.items()): disk_object_storage = self.disks.get_object_storage(disk_name=disk_name) if disk_object_storage is None: diff --git a/tests/unit/coordinator/plugins/clickhouse/test_steps.py b/tests/unit/coordinator/plugins/clickhouse/test_steps.py index 3de26df7..87e41caf 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_steps.py @@ -2,6 +2,7 @@ Copyright (c) 2021 Aiven Ltd See LICENSE for details """ +from astacus.common.asyncstorage import AsyncJsonStorage from astacus.common.exceptions import TransientException from astacus.common.ipc import BackupManifest, Plugin, SnapshotFile, SnapshotResult, SnapshotState from astacus.coordinator.cluster import Cluster @@ -9,6 +10,7 @@ from astacus.coordinator.plugins.base import ( BackupManifestStep, ComputeKeptBackupsStep, + ManifestMin, SnapshotStep, StepFailedError, StepsContext, @@ -69,6 +71,7 @@ from astacus.coordinator.plugins.zookeeper import FakeZooKeeperClient, ZooKeeperClient from base64 import b64encode from pathlib import Path +from tests.unit.storage import MemoryJsonStorage from typing import Awaitable, Iterable, Optional, Sequence from unittest import mock from unittest.mock import _Call as MockCall # pylint: disable=protected-access @@ -1171,57 +1174,56 @@ async def test_delete_object_storage_files_step(tmp_path: Path) -> None: ), ] ) + manifests = [ + BackupManifest( + start=datetime.datetime(2020, 1, 2, 10, tzinfo=datetime.timezone.utc), + end=datetime.datetime(2020, 1, 2, 11, tzinfo=datetime.timezone.utc), + attempt=1, + snapshot_results=[], + upload_results=[], + plugin=Plugin.clickhouse, + plugin_data=ClickHouseManifest( + version=ClickHouseBackupVersion.V2, + object_storage_files=[ + ClickHouseObjectStorageFiles( + disk_name="remote", + files=[ + ClickHouseObjectStorageFile(path=Path("abc/defghi")), + ClickHouseObjectStorageFile(path=Path("jkl/mnopqr")), + ], + ) + ], + ).to_plugin_data(), + filename="backup-2", + ), + BackupManifest( + start=datetime.datetime(2020, 1, 3, 10, tzinfo=datetime.timezone.utc), + end=datetime.datetime(2020, 1, 3, 11, tzinfo=datetime.timezone.utc), + attempt=1, + snapshot_results=[], + upload_results=[], + plugin=Plugin.clickhouse, + plugin_data=ClickHouseManifest( + version=ClickHouseBackupVersion.V2, + object_storage_files=[ + ClickHouseObjectStorageFiles( + disk_name="remote", + files=[ + ClickHouseObjectStorageFile(path=Path("jkl/mnopqr")), + ClickHouseObjectStorageFile(path=Path("stu/vwxyza")), + ], + ) + ], + ).to_plugin_data(), + filename="backup-3", + ), + ] + async_json_storage = AsyncJsonStorage(storage=MemoryJsonStorage(items={b.filename: b.json() for b in manifests})) disks = Disks(disks=[create_object_storage_disk("remote", object_storage)]) - step = DeleteDanglingObjectStorageFilesStep(disks=disks) + step = DeleteDanglingObjectStorageFilesStep(disks=disks, json_storage=async_json_storage) cluster = Cluster(nodes=[CoordinatorNode(url="node1"), CoordinatorNode(url="node2")]) context = StepsContext() - context.set_result( - ComputeKeptBackupsStep, - [ - BackupManifest( - start=datetime.datetime(2020, 1, 2, 10, tzinfo=datetime.timezone.utc), - end=datetime.datetime(2020, 1, 2, 11, tzinfo=datetime.timezone.utc), - attempt=1, - snapshot_results=[], - upload_results=[], - plugin=Plugin.clickhouse, - plugin_data=ClickHouseManifest( - version=ClickHouseBackupVersion.V2, - object_storage_files=[ - ClickHouseObjectStorageFiles( - disk_name="remote", - files=[ - ClickHouseObjectStorageFile(path=Path("abc/defghi")), - ClickHouseObjectStorageFile(path=Path("jkl/mnopqr")), - ], - ) - ], - ).to_plugin_data(), - filename="backup-2", - ), - BackupManifest( - start=datetime.datetime(2020, 1, 3, 10, tzinfo=datetime.timezone.utc), - end=datetime.datetime(2020, 1, 3, 11, tzinfo=datetime.timezone.utc), - attempt=1, - snapshot_results=[], - upload_results=[], - plugin=Plugin.clickhouse, - plugin_data=ClickHouseManifest( - version=ClickHouseBackupVersion.V2, - object_storage_files=[ - ClickHouseObjectStorageFiles( - disk_name="remote", - files=[ - ClickHouseObjectStorageFile(path=Path("jkl/mnopqr")), - ClickHouseObjectStorageFile(path=Path("stu/vwxyza")), - ], - ) - ], - ).to_plugin_data(), - filename="backup-3", - ), - ], - ) + context.set_result(ComputeKeptBackupsStep, [ManifestMin.from_manifest(b) for b in manifests]) await step.run_step(cluster, context) assert await object_storage.list_items() == [ # Only not_used/and_old was deleted diff --git a/tests/unit/coordinator/plugins/test_base.py b/tests/unit/coordinator/plugins/test_base.py index 0a34af84..8519965e 100644 --- a/tests/unit/coordinator/plugins/test_base.py +++ b/tests/unit/coordinator/plugins/test_base.py @@ -21,6 +21,7 @@ ListBackupsStep, ListDeltaBackupsStep, ListHexdigestsStep, + ManifestMin, SnapshotReleaseStep, SnapshotStep, Step, @@ -118,7 +119,7 @@ def named_manifest(filename: str) -> ipc.BackupManifest: return make_manifest(some_isoformat, some_isoformat, filename=filename) -def manifest_with_hashes(hashes: dict[str, bytes]) -> ipc.BackupManifest: +def manifest_with_hashes(hashes: dict[str, bytes], index: int) -> ipc.BackupManifest: return ipc.BackupManifest( start=datetime.datetime.fromisoformat("2020-01-05T15:30Z"), end=datetime.datetime.fromisoformat("2020-01-05T15:30Z"), @@ -128,7 +129,7 @@ def manifest_with_hashes(hashes: dict[str, bytes]) -> ipc.BackupManifest: ], upload_results=[], plugin=Plugin.files, - filename="some-manifest", + filename=f"some-manifest-{index}", ) @@ -342,7 +343,7 @@ async def test_delete_backup_and_delta_manifests( ([], {"a": b"a"}, {}), ([], {"a": b"a"}, {}), ( - [manifest_with_hashes({"a": b"a"}), manifest_with_hashes({"b": b"b"})], + [manifest_with_hashes({"a": b"a"}, 0), manifest_with_hashes({"b": b"b"}, 1)], {"a": b"a", "b": b"b", "c": b"c"}, {"a": b"a", "b": b"b"}, ), @@ -356,8 +357,9 @@ async def test_delete_dangling_hexdigests_step( expected_hashes: dict[str, bytes], ) -> None: async_digest_storage = AsyncHexDigestStorage(storage=MemoryHexDigestStorage(items=stored_hashes)) - context.set_result(ComputeKeptBackupsStep, kept_backups) - step = DeleteDanglingHexdigestsStep(hexdigest_storage=async_digest_storage) + async_json_storage = AsyncJsonStorage(storage=MemoryJsonStorage(items={b.filename: b.json() for b in kept_backups})) + context.set_result(ComputeKeptBackupsStep, [ManifestMin.from_manifest(b) for b in kept_backups]) + step = DeleteDanglingHexdigestsStep(json_storage=async_json_storage, hexdigest_storage=async_digest_storage) await step.run_step(single_node_cluster, context) assert stored_hashes == expected_hashes