From 074c8fae4811b00186aa4704933387932ea52acb Mon Sep 17 00:00:00 2001 From: Joe Lynch Date: Wed, 13 Nov 2024 14:37:51 +0000 Subject: [PATCH 1/2] clickhouse: Fix race condition in DeleteDanglingObjectStorageFilesStep Files can be uploaded _before_ a part appears in a backup manifest. [DDB-1372] --- astacus/coordinator/plugins/clickhouse/steps.py | 15 +++++++++++---- .../coordinator/plugins/clickhouse/test_steps.py | 6 ++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index d6a7366e..d3fe7722 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -43,6 +43,7 @@ from astacus.coordinator.plugins.zookeeper import ChangeWatch, NoNodeError, TransactionError, ZooKeeperClient from base64 import b64decode from collections.abc import Awaitable, Callable, Iterable, Iterator, Mapping, Sequence +from datetime import timedelta from kazoo.exceptions import ZookeeperError from typing import Any, cast, TypeVar @@ -1070,6 +1071,8 @@ class DeleteDanglingObjectStorageFilesStep(SyncStep[None]): disks: Disks json_storage: JsonStorage + # the longest it could be expected to take to upload a part + file_upload_grace_period: timedelta = timedelta(hours=6) def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None: backup_manifests = context.get_result(ComputeKeptBackupsStep) @@ -1078,7 +1081,14 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None: # If we don't have at least one backup, we don't know which files are more recent # than the latest backup, so we don't do anything. return + + # When a part is moved to the remote disk, firstly files are copied, + # then the part is committed. This means for a very large part with + # multiple files, the last_modified time of some files on remote storage + # may be significantly earlier than the time the part actually appears. + # We do not want to delete these files! newest_backup_start_time = max(backup_manifest.start for backup_manifest in backup_manifests) + latest_safe_delete_time = newest_backup_start_time - self.file_upload_grace_period kept_paths: dict[str, set[str]] = {} for manifest_min in backup_manifests: @@ -1097,10 +1107,7 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None: logger.info("found %d object storage files to keep in disk %r", len(disk_kept_paths), disk_name) disk_object_storage_items = disk_object_storage.list_items() for item in disk_object_storage_items: - # We don't know if objects newer than the latest backup should be kept or not, - # so we leave them for now. We'll delete them if necessary once there is a newer - # backup to tell us if they are still used or not. - if item.last_modified < newest_backup_start_time and item.key not in disk_kept_paths: + if item.last_modified < latest_safe_delete_time and item.key not in disk_kept_paths: logger.debug("dangling object storage file in disk %r : %r", disk_name, item.key) keys_to_remove.append(item.key) disk_available_paths = [item.key for item in disk_object_storage_items] diff --git a/tests/unit/coordinator/plugins/clickhouse/test_steps.py b/tests/unit/coordinator/plugins/clickhouse/test_steps.py index dd7b621a..ba2511d5 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_steps.py @@ -1453,6 +1453,9 @@ async def test_delete_object_storage_files_step(tmp_path: Path) -> None: ObjectStorageItem(key="jkl/mnopqr", last_modified=datetime.datetime(2020, 1, 2, tzinfo=datetime.UTC)), ObjectStorageItem(key="stu/vwxyza", last_modified=datetime.datetime(2020, 1, 3, tzinfo=datetime.UTC)), ObjectStorageItem(key="not_used/and_new", last_modified=datetime.datetime(2020, 1, 4, tzinfo=datetime.UTC)), + ObjectStorageItem( + key="not_used/and_within_grace_period", last_modified=datetime.datetime(2020, 1, 3, 7, tzinfo=datetime.UTC) + ), ] ) manifests = [ @@ -1514,6 +1517,9 @@ async def test_delete_object_storage_files_step(tmp_path: Path) -> None: ObjectStorageItem(key="jkl/mnopqr", last_modified=datetime.datetime(2020, 1, 2, tzinfo=datetime.UTC)), ObjectStorageItem(key="stu/vwxyza", last_modified=datetime.datetime(2020, 1, 3, tzinfo=datetime.UTC)), ObjectStorageItem(key="not_used/and_new", last_modified=datetime.datetime(2020, 1, 4, tzinfo=datetime.UTC)), + ObjectStorageItem( + key="not_used/and_within_grace_period", last_modified=datetime.datetime(2020, 1, 3, 7, tzinfo=datetime.UTC) + ), ] From b753a7ea240d1ade85b2ae637c155fa4bec62875 Mon Sep 17 00:00:00 2001 From: Joe Lynch Date: Wed, 13 Nov 2024 14:53:03 +0000 Subject: [PATCH 2/2] Add tool for downloading files from a backup --- astacus/manifest.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/astacus/manifest.py b/astacus/manifest.py index 53bce957..02ca7346 100644 --- a/astacus/manifest.py +++ b/astacus/manifest.py @@ -8,12 +8,18 @@ from astacus.common import ipc from astacus.common.rohmustorage import RohmuConfig, RohmuStorage +from pathlib import Path +import base64 import json +import logging import msgspec import shutil import sys +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + def create_manifest_parsers(parser, subparsers): p_manifest = subparsers.add_parser("manifest", help="Examine Astacus backup manifests") @@ -23,6 +29,7 @@ def create_manifest_parsers(parser, subparsers): create_list_parser(manifest_subparsers) create_describe_parser(manifest_subparsers) create_dump_parser(manifest_subparsers) + create_download_files_parser(manifest_subparsers) def create_list_parser(subparsers): @@ -48,6 +55,36 @@ def create_dump_parser(subparsers): p_dump.set_defaults(func=_run_dump) +def create_download_files_parser(subparsers): + p_download_files = subparsers.add_parser("download-files", help="Download files from a backup manifest") + p_download_files.add_argument( + "manifest", type=str, help="Manifest object name (can be obtained by running manifest list)" + ) + p_download_files.add_argument("destination", type=str, help="Destination directory to download files to") + p_download_files.add_argument("--prefix", type=str, help="Prefix to filter files", required=True) + p_download_files.set_defaults(func=_run_download_files) + + +def _run_download_files(args): + rohmu_storage = _create_rohmu_storage(args.config, args.storage) + manifest = rohmu_storage.download_json(args.manifest, ipc.BackupManifest) + destination = Path(args.destination) + for snapshot_result in manifest.snapshot_results: + assert snapshot_result.state + for snapshot_file in snapshot_result.state.files: + if not snapshot_file.relative_path.startswith(args.prefix): + continue + + path = destination / snapshot_file.relative_path + path.parent.mkdir(parents=True, exist_ok=True) + logger.info("Downloading %s to %s", snapshot_file.relative_path, path) + if snapshot_file.hexdigest: + rohmu_storage.download_hexdigest_to_path(snapshot_file.hexdigest, path) + else: + assert snapshot_file.content_b64 is not None + path.write_bytes(base64.b64decode(snapshot_file.content_b64)) + + def _run_list(args): rohmu_storage = _create_rohmu_storage(args.config, args.storage) json_names = rohmu_storage.list_jsons()