Skip to content

Commit

Permalink
clickhouse: Fix race condition in DeleteDanglingObjectStorageFilesStep
Browse files Browse the repository at this point in the history
Files can be uploaded _before_ a part appears in a backup manifest.
[DDB-1372]
  • Loading branch information
joelynch committed Nov 13, 2024
1 parent 86b5089 commit 074c8fa
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
15 changes: 11 additions & 4 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from astacus.coordinator.plugins.zookeeper import ChangeWatch, NoNodeError, TransactionError, ZooKeeperClient
from base64 import b64decode
from collections.abc import Awaitable, Callable, Iterable, Iterator, Mapping, Sequence
from datetime import timedelta
from kazoo.exceptions import ZookeeperError
from typing import Any, cast, TypeVar

Expand Down Expand Up @@ -1070,6 +1071,8 @@ class DeleteDanglingObjectStorageFilesStep(SyncStep[None]):

disks: Disks
json_storage: JsonStorage
# the longest it could be expected to take to upload a part
file_upload_grace_period: timedelta = timedelta(hours=6)

def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
backup_manifests = context.get_result(ComputeKeptBackupsStep)
Expand All @@ -1078,7 +1081,14 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
# If we don't have at least one backup, we don't know which files are more recent
# than the latest backup, so we don't do anything.
return

# When a part is moved to the remote disk, firstly files are copied,
# then the part is committed. This means for a very large part with
# multiple files, the last_modified time of some files on remote storage
# may be significantly earlier than the time the part actually appears.
# We do not want to delete these files!
newest_backup_start_time = max(backup_manifest.start for backup_manifest in backup_manifests)
latest_safe_delete_time = newest_backup_start_time - self.file_upload_grace_period

kept_paths: dict[str, set[str]] = {}
for manifest_min in backup_manifests:
Expand All @@ -1097,10 +1107,7 @@ def run_sync_step(self, cluster: Cluster, context: StepsContext) -> None:
logger.info("found %d object storage files to keep in disk %r", len(disk_kept_paths), disk_name)
disk_object_storage_items = disk_object_storage.list_items()
for item in disk_object_storage_items:
# We don't know if objects newer than the latest backup should be kept or not,
# so we leave them for now. We'll delete them if necessary once there is a newer
# backup to tell us if they are still used or not.
if item.last_modified < newest_backup_start_time and item.key not in disk_kept_paths:
if item.last_modified < latest_safe_delete_time and item.key not in disk_kept_paths:
logger.debug("dangling object storage file in disk %r : %r", disk_name, item.key)
keys_to_remove.append(item.key)
disk_available_paths = [item.key for item in disk_object_storage_items]
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,9 @@ async def test_delete_object_storage_files_step(tmp_path: Path) -> None:
ObjectStorageItem(key="jkl/mnopqr", last_modified=datetime.datetime(2020, 1, 2, tzinfo=datetime.UTC)),
ObjectStorageItem(key="stu/vwxyza", last_modified=datetime.datetime(2020, 1, 3, tzinfo=datetime.UTC)),
ObjectStorageItem(key="not_used/and_new", last_modified=datetime.datetime(2020, 1, 4, tzinfo=datetime.UTC)),
ObjectStorageItem(
key="not_used/and_within_grace_period", last_modified=datetime.datetime(2020, 1, 3, 7, tzinfo=datetime.UTC)
),
]
)
manifests = [
Expand Down Expand Up @@ -1514,6 +1517,9 @@ async def test_delete_object_storage_files_step(tmp_path: Path) -> None:
ObjectStorageItem(key="jkl/mnopqr", last_modified=datetime.datetime(2020, 1, 2, tzinfo=datetime.UTC)),
ObjectStorageItem(key="stu/vwxyza", last_modified=datetime.datetime(2020, 1, 3, tzinfo=datetime.UTC)),
ObjectStorageItem(key="not_used/and_new", last_modified=datetime.datetime(2020, 1, 4, tzinfo=datetime.UTC)),
ObjectStorageItem(
key="not_used/and_within_grace_period", last_modified=datetime.datetime(2020, 1, 3, 7, tzinfo=datetime.UTC)
),
]


Expand Down

0 comments on commit 074c8fa

Please sign in to comment.