Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clickhouse: Fix race condition in DeleteDanglingObjectStorageFilesStep #259

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from astacus.coordinator.plugins.zookeeper import ChangeWatch, NoNodeError, TransactionError, ZooKeeperClient
from base64 import b64decode
from collections.abc import Awaitable, Callable, Iterable, Iterator, Mapping, Sequence
from datetime import timedelta
from kazoo.exceptions import ZookeeperError
from typing import Any, cast, TypeVar

Expand Down Expand Up @@ -1070,6 +1071,8 @@ class DeleteDanglingObjectStorageFilesStep(SyncStep[None]):

disks: Disks
json_storage: JsonStorage
# the longest it could be expected to take to upload a part
file_upload_grace_period: timedelta = timedelta(hours=6)

def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
backup_manifests = context.get_result(ComputeKeptBackupsStep)
Expand All @@ -1078,7 +1081,14 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
# If we don't have at least one backup, we don't know which files are more recent
# than the latest backup, so we don't do anything.
return

# When a part is moved to the remote disk, firstly files are copied,
# then the part is committed. This means for a very large part with
# multiple files, the last_modified time of some files on remote storage
# may be significantly earlier than the time the part actually appears.
# We do not want to delete these files!
newest_backup_start_time = max(backup_manifest.start for backup_manifest in backup_manifests)
latest_safe_delete_time = newest_backup_start_time - self.file_upload_grace_period

kept_paths: dict[str, set[str]] = {}
for manifest_min in backup_manifests:
Expand All @@ -1097,10 +1107,7 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
logger.info("found %d object storage files to keep in disk %r", len(disk_kept_paths), disk_name)
disk_object_storage_items = disk_object_storage.list_items()
for item in disk_object_storage_items:
# We don't know if objects newer than the latest backup should be kept or not,
# so we leave them for now. We'll delete them if necessary once there is a newer
# backup to tell us if they are still used or not.
if item.last_modified < newest_backup_start_time and item.key not in disk_kept_paths:
if item.last_modified < latest_safe_delete_time and item.key not in disk_kept_paths:
logger.debug("dangling object storage file in disk %r : %r", disk_name, item.key)
keys_to_remove.append(item.key)
disk_available_paths = [item.key for item in disk_object_storage_items]
Expand Down
37 changes: 37 additions & 0 deletions astacus/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@

from astacus.common import ipc
from astacus.common.rohmustorage import RohmuConfig, RohmuStorage
from pathlib import Path

import base64
import json
import logging
import msgspec
import shutil
import sys

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def create_manifest_parsers(parser, subparsers):
p_manifest = subparsers.add_parser("manifest", help="Examine Astacus backup manifests")
Expand All @@ -23,6 +29,7 @@ def create_manifest_parsers(parser, subparsers):
create_list_parser(manifest_subparsers)
create_describe_parser(manifest_subparsers)
create_dump_parser(manifest_subparsers)
create_download_files_parser(manifest_subparsers)


def create_list_parser(subparsers):
Expand All @@ -48,6 +55,36 @@ def create_dump_parser(subparsers):
p_dump.set_defaults(func=_run_dump)


def create_download_files_parser(subparsers):
p_download_files = subparsers.add_parser("download-files", help="Download files from a backup manifest")
p_download_files.add_argument(
"manifest", type=str, help="Manifest object name (can be obtained by running manifest list)"
)
p_download_files.add_argument("destination", type=str, help="Destination directory to download files to")
p_download_files.add_argument("--prefix", type=str, help="Prefix to filter files", required=True)
p_download_files.set_defaults(func=_run_download_files)


def _run_download_files(args):
rohmu_storage = _create_rohmu_storage(args.config, args.storage)
manifest = rohmu_storage.download_json(args.manifest, ipc.BackupManifest)
destination = Path(args.destination)
for snapshot_result in manifest.snapshot_results:
assert snapshot_result.state
for snapshot_file in snapshot_result.state.files:
if not snapshot_file.relative_path.startswith(args.prefix):
continue

path = destination / snapshot_file.relative_path
path.parent.mkdir(parents=True, exist_ok=True)
logger.info("Downloading %s to %s", snapshot_file.relative_path, path)
if snapshot_file.hexdigest:
rohmu_storage.download_hexdigest_to_path(snapshot_file.hexdigest, path)
else:
assert snapshot_file.content_b64 is not None
path.write_bytes(base64.b64decode(snapshot_file.content_b64))


def _run_list(args):
rohmu_storage = _create_rohmu_storage(args.config, args.storage)
json_names = rohmu_storage.list_jsons()
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,9 @@ async def test_delete_object_storage_files_step(tmp_path: Path) -> None:
ObjectStorageItem(key="jkl/mnopqr", last_modified=datetime.datetime(2020, 1, 2, tzinfo=datetime.UTC)),
ObjectStorageItem(key="stu/vwxyza", last_modified=datetime.datetime(2020, 1, 3, tzinfo=datetime.UTC)),
ObjectStorageItem(key="not_used/and_new", last_modified=datetime.datetime(2020, 1, 4, tzinfo=datetime.UTC)),
ObjectStorageItem(
key="not_used/and_within_grace_period", last_modified=datetime.datetime(2020, 1, 3, 7, tzinfo=datetime.UTC)
),
]
)
manifests = [
Expand Down Expand Up @@ -1514,6 +1517,9 @@ async def test_delete_object_storage_files_step(tmp_path: Path) -> None:
ObjectStorageItem(key="jkl/mnopqr", last_modified=datetime.datetime(2020, 1, 2, tzinfo=datetime.UTC)),
ObjectStorageItem(key="stu/vwxyza", last_modified=datetime.datetime(2020, 1, 3, tzinfo=datetime.UTC)),
ObjectStorageItem(key="not_used/and_new", last_modified=datetime.datetime(2020, 1, 4, tzinfo=datetime.UTC)),
ObjectStorageItem(
key="not_used/and_within_grace_period", last_modified=datetime.datetime(2020, 1, 3, 7, tzinfo=datetime.UTC)
),
]


Expand Down
Loading