Skip to content

Commit

Permalink
Merge pull request #173 from Aiven-Open/joelynch/per-node-manifests
Browse files Browse the repository at this point in the history
cleanup: reduce memory usage

#173
  • Loading branch information
kmichel-aiven authored Dec 21, 2023
2 parents 416d6b5 + 3cfee26 commit f869609
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 78 deletions.
48 changes: 30 additions & 18 deletions astacus/coordinator/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def get_cleanup_steps(
explicit_delete=explicit_delete,
),
DeleteBackupManifestsStep(json_storage=context.json_storage),
DeleteDanglingHexdigestsStep(hexdigest_storage=context.hexdigest_storage),
DeleteDanglingHexdigestsStep(
json_storage=context.json_storage,
hexdigest_storage=context.hexdigest_storage,
),
]


Expand Down Expand Up @@ -533,7 +536,18 @@ async def clear_delta(
await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)


def _prune_manifests(manifests: List[ipc.BackupManifest], retention: Retention) -> List[ipc.BackupManifest]:
@dataclasses.dataclass
class ManifestMin:
start: datetime.datetime
end: datetime.datetime
filename: str

@classmethod
def from_manifest(cls, manifest: ipc.BackupManifest) -> ManifestMin:
return cls(start=manifest.start, end=manifest.end, filename=manifest.filename)


def _prune_manifests(manifests: List[ManifestMin], retention: Retention) -> List[ManifestMin]:
manifests = sorted(manifests, key=lambda m: (m.start, m.end, m.filename), reverse=True)
if retention.minimum_backups is not None and retention.minimum_backups >= len(manifests):
return manifests
Expand Down Expand Up @@ -564,7 +578,7 @@ def _prune_manifests(manifests: List[ipc.BackupManifest], retention: Retention)


@dataclasses.dataclass
class ComputeKeptBackupsStep(Step[Sequence[ipc.BackupManifest]]):
class ComputeKeptBackupsStep(Step[Sequence[ManifestMin]]):
"""
Return a list of backup manifests we want to keep, after excluding the explicitly deleted
backups and applying the retention rules.
Expand All @@ -575,32 +589,29 @@ class ComputeKeptBackupsStep(Step[Sequence[ipc.BackupManifest]]):
explicit_delete: Sequence[str]
retain_deltas: bool = False

async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[ipc.BackupManifest]:
async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[ManifestMin]:
kept_manifests = await self.compute_kept_basebackups(context)
if self.retain_deltas:
kept_manifests += await self.compute_kept_deltas(kept_manifests, context)
return kept_manifests

async def compute_kept_basebackups(self, context: StepsContext) -> List[ipc.BackupManifest]:
async def compute_kept_basebackups(self, context: StepsContext) -> List[ManifestMin]:
all_backup_names = context.get_result(ListBackupsStep)
kept_backup_names = all_backup_names.difference(set(self.explicit_delete))
manifests = []
for backup_name in kept_backup_names:
manifests.append(await download_backup_manifest(self.json_storage, backup_name))
manifests = _prune_manifests(manifests, self.retention)
manifests.append(ManifestMin.from_manifest(await download_backup_manifest(self.json_storage, backup_name)))

return manifests
return _prune_manifests(manifests, self.retention)

async def compute_kept_deltas(
self, kept_backups: Sequence[ipc.BackupManifest], context: StepsContext
) -> List[ipc.BackupManifest]:
async def compute_kept_deltas(self, kept_backups: Sequence[ManifestMin], context: StepsContext) -> List[ManifestMin]:
if not kept_backups:
return []
all_delta_names = context.get_result(ListDeltaBackupsStep)
oldest_kept_backup = min(kept_backups, key=lambda b: b.start)
kept_deltas: List[ipc.BackupManifest] = []
kept_deltas: List[ManifestMin] = []
for delta_name in all_delta_names:
delta_manifest = await download_backup_manifest(self.json_storage, delta_name)
delta_manifest = ManifestMin.from_manifest(await download_backup_manifest(self.json_storage, delta_name))
if delta_manifest.end < oldest_kept_backup.end:
continue
kept_deltas.append(delta_manifest)
Expand Down Expand Up @@ -641,17 +652,18 @@ class DeleteDanglingHexdigestsStep(Step[None]):
"""

hexdigest_storage: AsyncHexDigestStorage
json_storage: AsyncJsonStorage

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
kept_manifests = context.get_result(ComputeKeptBackupsStep)
logger.info("listing extra hexdigests")
kept_hexdigests: set[str] = set()
for manifest in kept_manifests:
extra_hexdigests = set(await self.hexdigest_storage.list_hexdigests())
for manifest_min in kept_manifests:
manifest = await download_backup_manifest(self.json_storage, manifest_min.filename)
for result in manifest.snapshot_results:
assert result.hashes is not None
kept_hexdigests = kept_hexdigests | set(h.hexdigest for h in result.hashes if h.hexdigest)
all_hexdigests = await self.hexdigest_storage.list_hexdigests()
extra_hexdigests = set(all_hexdigests).difference(kept_hexdigests)
for hash_ in result.hashes:
extra_hexdigests.discard(hash_.hexdigest)
logger.info("deleting %d hexdigests from object storage", len(extra_hexdigests))
for i, hexdigest in enumerate(extra_hexdigests, 1):
# Due to rate limiting, it might be better to not do this in parallel
Expand Down
5 changes: 4 additions & 1 deletion astacus/coordinator/plugins/cassandra/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,5 +220,8 @@ def get_cleanup_steps(
retain_deltas=True,
),
base.DeleteBackupAndDeltaManifestsStep(json_storage=context.json_storage),
base.DeleteDanglingHexdigestsStep(hexdigest_storage=context.hexdigest_storage),
base.DeleteDanglingHexdigestsStep(
json_storage=context.json_storage,
hexdigest_storage=context.hexdigest_storage,
),
]
7 changes: 5 additions & 2 deletions astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ def get_cleanup_steps(
explicit_delete=explicit_delete,
),
DeleteBackupManifestsStep(json_storage=context.json_storage),
DeleteDanglingHexdigestsStep(hexdigest_storage=context.hexdigest_storage),
DeleteDanglingObjectStorageFilesStep(disks=disks),
DeleteDanglingHexdigestsStep(
json_storage=context.json_storage,
hexdigest_storage=context.hexdigest_storage,
),
DeleteDanglingObjectStorageFilesStep(disks=disks, json_storage=context.json_storage),
]
12 changes: 8 additions & 4 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
from .parts import list_parts_to_attach
from .replication import DatabaseReplica, get_databases_replicas, get_shard_and_replica, sync_replicated_database
from astacus.common import ipc
from astacus.common.asyncstorage import AsyncJsonStorage
from astacus.common.exceptions import TransientException
from astacus.common.limiter import gather_limited
from astacus.coordinator.cluster import Cluster
from astacus.coordinator.manifest import download_backup_manifest
from astacus.coordinator.plugins.base import (
BackupManifestStep,
ComputeKeptBackupsStep,
Expand Down Expand Up @@ -784,6 +786,7 @@ class DeleteDanglingObjectStorageFilesStep(Step[None]):
"""

disks: Disks
json_storage: AsyncJsonStorage

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
backup_manifests = context.get_result(ComputeKeptBackupsStep)
Expand All @@ -793,14 +796,15 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
# than the latest backup, so we don't do anything.
return
newest_backup_start_time = max((backup_manifest.start for backup_manifest in backup_manifests))
clickhouse_manifests = [
ClickHouseManifest.from_plugin_data(backup_manifest.plugin_data) for backup_manifest in backup_manifests
]

kept_paths: dict[str, set[Path]] = {}
for clickhouse_manifest in clickhouse_manifests:
for manifest_min in backup_manifests:
manifest_data = await download_backup_manifest(self.json_storage, manifest_min.filename)
clickhouse_manifest = ClickHouseManifest.from_plugin_data(manifest_data.plugin_data)
for object_storage_files in clickhouse_manifest.object_storage_files:
disk_kept_paths = kept_paths.setdefault(object_storage_files.disk_name, set())
disk_kept_paths.update((file.path for file in object_storage_files.files))

for disk_name, disk_kept_paths in sorted(kept_paths.items()):
disk_object_storage = self.disks.get_object_storage(disk_name=disk_name)
if disk_object_storage is None:
Expand Down
98 changes: 50 additions & 48 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
Copyright (c) 2021 Aiven Ltd
See LICENSE for details
"""
from astacus.common.asyncstorage import AsyncJsonStorage
from astacus.common.exceptions import TransientException
from astacus.common.ipc import BackupManifest, Plugin, SnapshotFile, SnapshotResult, SnapshotState
from astacus.coordinator.cluster import Cluster
from astacus.coordinator.config import CoordinatorNode
from astacus.coordinator.plugins.base import (
BackupManifestStep,
ComputeKeptBackupsStep,
ManifestMin,
SnapshotStep,
StepFailedError,
StepsContext,
Expand Down Expand Up @@ -69,6 +71,7 @@
from astacus.coordinator.plugins.zookeeper import FakeZooKeeperClient, ZooKeeperClient
from base64 import b64encode
from pathlib import Path
from tests.unit.storage import MemoryJsonStorage
from typing import Awaitable, Iterable, Optional, Sequence
from unittest import mock
from unittest.mock import _Call as MockCall # pylint: disable=protected-access
Expand Down Expand Up @@ -1171,57 +1174,56 @@ async def test_delete_object_storage_files_step(tmp_path: Path) -> None:
),
]
)
manifests = [
BackupManifest(
start=datetime.datetime(2020, 1, 2, 10, tzinfo=datetime.timezone.utc),
end=datetime.datetime(2020, 1, 2, 11, tzinfo=datetime.timezone.utc),
attempt=1,
snapshot_results=[],
upload_results=[],
plugin=Plugin.clickhouse,
plugin_data=ClickHouseManifest(
version=ClickHouseBackupVersion.V2,
object_storage_files=[
ClickHouseObjectStorageFiles(
disk_name="remote",
files=[
ClickHouseObjectStorageFile(path=Path("abc/defghi")),
ClickHouseObjectStorageFile(path=Path("jkl/mnopqr")),
],
)
],
).to_plugin_data(),
filename="backup-2",
),
BackupManifest(
start=datetime.datetime(2020, 1, 3, 10, tzinfo=datetime.timezone.utc),
end=datetime.datetime(2020, 1, 3, 11, tzinfo=datetime.timezone.utc),
attempt=1,
snapshot_results=[],
upload_results=[],
plugin=Plugin.clickhouse,
plugin_data=ClickHouseManifest(
version=ClickHouseBackupVersion.V2,
object_storage_files=[
ClickHouseObjectStorageFiles(
disk_name="remote",
files=[
ClickHouseObjectStorageFile(path=Path("jkl/mnopqr")),
ClickHouseObjectStorageFile(path=Path("stu/vwxyza")),
],
)
],
).to_plugin_data(),
filename="backup-3",
),
]
async_json_storage = AsyncJsonStorage(storage=MemoryJsonStorage(items={b.filename: b.json() for b in manifests}))
disks = Disks(disks=[create_object_storage_disk("remote", object_storage)])
step = DeleteDanglingObjectStorageFilesStep(disks=disks)
step = DeleteDanglingObjectStorageFilesStep(disks=disks, json_storage=async_json_storage)
cluster = Cluster(nodes=[CoordinatorNode(url="node1"), CoordinatorNode(url="node2")])
context = StepsContext()
context.set_result(
ComputeKeptBackupsStep,
[
BackupManifest(
start=datetime.datetime(2020, 1, 2, 10, tzinfo=datetime.timezone.utc),
end=datetime.datetime(2020, 1, 2, 11, tzinfo=datetime.timezone.utc),
attempt=1,
snapshot_results=[],
upload_results=[],
plugin=Plugin.clickhouse,
plugin_data=ClickHouseManifest(
version=ClickHouseBackupVersion.V2,
object_storage_files=[
ClickHouseObjectStorageFiles(
disk_name="remote",
files=[
ClickHouseObjectStorageFile(path=Path("abc/defghi")),
ClickHouseObjectStorageFile(path=Path("jkl/mnopqr")),
],
)
],
).to_plugin_data(),
filename="backup-2",
),
BackupManifest(
start=datetime.datetime(2020, 1, 3, 10, tzinfo=datetime.timezone.utc),
end=datetime.datetime(2020, 1, 3, 11, tzinfo=datetime.timezone.utc),
attempt=1,
snapshot_results=[],
upload_results=[],
plugin=Plugin.clickhouse,
plugin_data=ClickHouseManifest(
version=ClickHouseBackupVersion.V2,
object_storage_files=[
ClickHouseObjectStorageFiles(
disk_name="remote",
files=[
ClickHouseObjectStorageFile(path=Path("jkl/mnopqr")),
ClickHouseObjectStorageFile(path=Path("stu/vwxyza")),
],
)
],
).to_plugin_data(),
filename="backup-3",
),
],
)
context.set_result(ComputeKeptBackupsStep, [ManifestMin.from_manifest(b) for b in manifests])
await step.run_step(cluster, context)
assert await object_storage.list_items() == [
# Only not_used/and_old was deleted
Expand Down
12 changes: 7 additions & 5 deletions tests/unit/coordinator/plugins/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ListBackupsStep,
ListDeltaBackupsStep,
ListHexdigestsStep,
ManifestMin,
SnapshotReleaseStep,
SnapshotStep,
Step,
Expand Down Expand Up @@ -118,7 +119,7 @@ def named_manifest(filename: str) -> ipc.BackupManifest:
return make_manifest(some_isoformat, some_isoformat, filename=filename)


def manifest_with_hashes(hashes: dict[str, bytes]) -> ipc.BackupManifest:
def manifest_with_hashes(hashes: dict[str, bytes], index: int) -> ipc.BackupManifest:
return ipc.BackupManifest(
start=datetime.datetime.fromisoformat("2020-01-05T15:30Z"),
end=datetime.datetime.fromisoformat("2020-01-05T15:30Z"),
Expand All @@ -128,7 +129,7 @@ def manifest_with_hashes(hashes: dict[str, bytes]) -> ipc.BackupManifest:
],
upload_results=[],
plugin=Plugin.files,
filename="some-manifest",
filename=f"some-manifest-{index}",
)


Expand Down Expand Up @@ -342,7 +343,7 @@ async def test_delete_backup_and_delta_manifests(
([], {"a": b"a"}, {}),
([], {"a": b"a"}, {}),
(
[manifest_with_hashes({"a": b"a"}), manifest_with_hashes({"b": b"b"})],
[manifest_with_hashes({"a": b"a"}, 0), manifest_with_hashes({"b": b"b"}, 1)],
{"a": b"a", "b": b"b", "c": b"c"},
{"a": b"a", "b": b"b"},
),
Expand All @@ -356,8 +357,9 @@ async def test_delete_dangling_hexdigests_step(
expected_hashes: dict[str, bytes],
) -> None:
async_digest_storage = AsyncHexDigestStorage(storage=MemoryHexDigestStorage(items=stored_hashes))
context.set_result(ComputeKeptBackupsStep, kept_backups)
step = DeleteDanglingHexdigestsStep(hexdigest_storage=async_digest_storage)
async_json_storage = AsyncJsonStorage(storage=MemoryJsonStorage(items={b.filename: b.json() for b in kept_backups}))
context.set_result(ComputeKeptBackupsStep, [ManifestMin.from_manifest(b) for b in kept_backups])
step = DeleteDanglingHexdigestsStep(json_storage=async_json_storage, hexdigest_storage=async_digest_storage)
await step.run_step(single_node_cluster, context)
assert stored_hashes == expected_hashes

Expand Down

0 comments on commit f869609

Please sign in to comment.