Skip to content

Commit

Permalink
Release extra files in dst after upload
Browse files Browse the repository at this point in the history
Allow the coordinator to release the files that were uploaded during
backup creation. Helps to free disk space after the backup was uploaded.
Especially useful for Cassandra, since because of compactions we end
up keeping around copies of old sstables.
  • Loading branch information
dmitry-potepalov committed Oct 16, 2023
1 parent 017eb53 commit cc22e38
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 16 deletions.
5 changes: 5 additions & 0 deletions astacus/common/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ class SnapshotClearRequest(NodeRequest):
root_globs: Sequence[str]


class SnapshotReleaseRequest(NodeRequest):
# Files matching these digests will be unlinked in snapshotter's dst
hexdigests: Sequence[str]


# node.cassandra


Expand Down
4 changes: 2 additions & 2 deletions astacus/coordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ async def request_from_nodes(
*,
caller: str,
req: Optional[ipc.NodeRequest] = None,
reqs: Optional[List[ipc.NodeRequest]] = None,
nodes: Optional[List[CoordinatorNode]] = None,
reqs: Optional[Sequence[ipc.NodeRequest]] = None,
nodes: Optional[Sequence[CoordinatorNode]] = None,
**kw,
) -> Sequence[Optional[Result]]:
"""Perform asynchronously parallel request to the node components.
Expand Down
30 changes: 30 additions & 0 deletions astacus/coordinator/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,36 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.No
return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)


@dataclasses.dataclass
class SnapshotReleaseStep(Step[List[ipc.NodeResult]]):
"""
Request to release the files we don't need any more in the destination hierarchy.
Allows to free some disk space before the next backup happens.
"""

async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.NodeResult]:
snapshot_results = context.get_result(SnapshotStep)
nodes_metadata = await get_nodes_metadata(cluster)
all_nodes_have_release_feature = nodes_metadata and all(
Features.release_snapshot_files.value in n.features for n in nodes_metadata
)
if not all_nodes_have_release_feature:
logger.info("Skipped SnapshotReleaseStep because some nodes don't support it, node features: %s", nodes_metadata)
return []
node_requests = [
ipc.SnapshotReleaseRequest(hexdigests=self._hexdigests_from_hashes(s.hashes)) for s in snapshot_results
]
start_results = await cluster.request_from_nodes(
"release", method="post", caller="SnapshotReleaseStep", reqs=node_requests
)
return await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)

def _hexdigests_from_hashes(self, hashes: Optional[List[ipc.SnapshotHash]]) -> Sequence[str]:
assert hashes is not None
return [h.hexdigest for h in hashes]


@dataclasses.dataclass
class UploadManifestStep(Step[None]):
"""
Expand Down
1 change: 1 addition & 0 deletions astacus/coordinator/plugins/cassandra/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def get_backup_steps(self, *, context: OperationContext) -> List[Step]:
base.ListHexdigestsStep(hexdigest_storage=context.hexdigest_storage),
base.UploadBlocksStep(storage_name=context.storage_name),
CassandraSubOpStep(op=ipc.CassandraSubOp.remove_snapshot),
base.SnapshotReleaseStep(),
base.UploadManifestStep(
json_storage=context.json_storage,
plugin=ipc.Plugin.cassandra,
Expand Down
20 changes: 19 additions & 1 deletion astacus/node/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .clear import ClearOp
from .download import DownloadOp
from .node import Node
from .snapshot import SnapshotOp, UploadOp
from .snapshot import ReleaseOp, SnapshotOp, UploadOp
from .state import node_state, NodeState
from astacus.common import ipc
from astacus.common.magic import StrEnum
Expand Down Expand Up @@ -35,13 +35,16 @@ class OpName(StrEnum):
download = "download"
snapshot = "snapshot"
upload = "upload"
release = "release"


class Features(Enum):
# Added on 2022-11-29, this can be assumed to be supported everywhere after 1 or 2 years
validate_file_hashes = "validate_file_hashes"
# Added on 2023-06-07
snapshot_groups = "snapshot_groups"
# Added on 2023-10-16
release_snapshot_files = "release_snapshot_files"


def is_allowed(subop: ipc.CassandraSubOp, access_level: CassandraAccessLevel):
Expand Down Expand Up @@ -149,6 +152,21 @@ def delta_upload_result(*, op_id: int, n: Node = Depends()):
return op.result


@router.post("/release")
def release(req: ipc.SnapshotReleaseRequest, n: Node = Depends()):
if not n.state.is_locked:
raise HTTPException(status_code=409, detail="Not locked")
snapshotter = n.get_snapshotter()
assert snapshotter
return ReleaseOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter)


@router.get("/release/{op_id}")
def release_result(*, op_id: int, n: Node = Depends()):
op, _ = n.get_op_and_op_info(op_id=op_id, op_name=OpName.release)
return op.result


@router.post("/download")
def download(req: ipc.SnapshotDownloadRequest, n: Node = Depends()):
if not n.state.is_locked:
Expand Down
22 changes: 22 additions & 0 deletions astacus/node/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,25 @@ def upload(self) -> None:
validate_file_hashes=self.req.validate_file_hashes,
)
self.result.progress.done()


class ReleaseOp(NodeOp[ipc.SnapshotReleaseRequest, ipc.NodeResult]):
snapshotter: Optional[Snapshotter] = None

def create_result(self) -> ipc.NodeResult:
return ipc.NodeResult()

def start(self, snapshotter: Snapshotter) -> NodeOp.StartResult:
logger.info("start_release %r", self.req)
self.snapshotter = snapshotter
return self.start_op(op_name="release", op=self, fun=self.release)

def release(self) -> None:
assert self.snapshotter
with self.snapshotter.lock:
self.check_op_id()
self.result.progress.add_total(len(self.req.hexdigests))
for hexdigest in self.req.hexdigests:
self.snapshotter.release(hexdigest)
self.result.progress.add_success()
self.result.progress.done()
10 changes: 10 additions & 0 deletions astacus/node/snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ def _remove_snapshotfile(self, snapshotfile: SnapshotFile) -> None:
if snapshotfile.hexdigest:
self.hexdigest_to_snapshotfiles[snapshotfile.hexdigest].remove(snapshotfile)

def _release_snapshotfile(self, snapshotfile: SnapshotFile) -> None:
dst_path = self.dst / snapshotfile.relative_path
dst_path.unlink(missing_ok=True)

def _snapshotfile_from_path(self, relative_path) -> SnapshotFile:
src_path = self.src / relative_path
st = src_path.stat()
Expand Down Expand Up @@ -273,3 +277,9 @@ def _result_cb(*, map_in: SnapshotFile, map_out: SnapshotFile) -> bool:
progress.add_success()

return changes

def release(self, hexdigest: str) -> None:
assert self.lock.locked()
assert self.src != self.dst
for snapshotfile in self.hexdigest_to_snapshotfiles.get(hexdigest, []):
self._release_snapshotfile(snapshotfile)
77 changes: 65 additions & 12 deletions tests/unit/coordinator/plugins/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ComputeKeptBackupsStep,
ListBackupsStep,
ListHexdigestsStep,
SnapshotReleaseStep,
SnapshotStep,
Step,
StepsContext,
Expand All @@ -26,7 +27,7 @@
from io import BytesIO
from pydantic import Field
from tests.unit.json_storage import MemoryJsonStorage
from typing import AbstractSet, List, Optional, Sequence
from typing import AbstractSet, Callable, List, Optional, Sequence
from unittest import mock

import datetime
Expand Down Expand Up @@ -81,6 +82,17 @@ def fixture_context() -> StepsContext:
return StepsContext()


def make_request_check(expected_payload: dict, op_name: str) -> Callable[[httpx.Request], httpx.Response]:
def check_request(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content)
assert payload == expected_payload
return httpx.Response(
status_code=HTTPStatus.OK, json=Op.StartResult(op_id=1, status_url=f"http://node_1/{op_name}/1").jsondict()
)

return check_request


@pytest.mark.asyncio
@pytest.mark.parametrize(
"node_features,expected_request",
Expand Down Expand Up @@ -111,19 +123,13 @@ async def test_upload_step_uses_new_request_if_supported(
)
upload_step = UploadBlocksStep(storage_name="fake")
with respx.mock:

def check_request(request: httpx.Request) -> httpx.Response:
payload = json.loads(request.content)
assert payload == expected_request.jsondict()
return httpx.Response(
status_code=HTTPStatus.OK, json=Op.StartResult(op_id=1, status_url="http://node_1/upload/1").jsondict()
)

respx.get("http://node_1/metadata").respond(
metadata_request = respx.get("http://node_1/metadata").respond(
json=ipc.MetadataResult(version="0.1", features=[feature.value for feature in node_features]).jsondict()
)
respx.post("http://node_1/upload").mock(side_effect=check_request)
respx.get("http://node_1/upload/1").respond(
upload_request = respx.post("http://node_1/upload").mock(
side_effect=make_request_check(expected_request.jsondict(), "upload")
)
status_request = respx.get("http://node_1/upload/1").respond(
json=ipc.SnapshotUploadResult(
hostname="localhost",
az="az1",
Expand All @@ -133,6 +139,9 @@ def check_request(request: httpx.Request) -> httpx.Response:
).jsondict()
)
await upload_step.run_step(cluster=single_node_cluster, context=context)
assert metadata_request.call_count == 1
assert upload_request.call_count == 1
assert status_request.called


BACKUPS_FOR_RETENTION_TEST = {
Expand Down Expand Up @@ -232,3 +241,47 @@ async def test_upload_manifest_step_generates_correct_backup_name(
step = UploadManifestStep(json_storage=async_json_storage, plugin=ipc.Plugin.files)
await step.run_step(cluster=single_node_cluster, context=context)
assert "backup-2020-01-07T05:00:00+00:00" in async_json_storage.storage.items


@pytest.mark.asyncio
@pytest.mark.parametrize(
"node_features,expected_request",
[
([], None),
(
[Features.release_snapshot_files],
ipc.SnapshotReleaseRequest(hexdigests=["aaa", "bbb"]),
),
],
)
async def test_snapshot_release_step(
node_features: Sequence[Features],
expected_request: Optional[ipc.SnapshotReleaseRequest],
single_node_cluster: Cluster,
context: StepsContext,
) -> None:
hashes_to_release = [ipc.SnapshotHash(hexdigest="aaa", size=1), ipc.SnapshotHash(hexdigest="bbb", size=2)]
context.set_result(SnapshotStep, [DefaultedSnapshotResult(hashes=hashes_to_release)])
release_step = SnapshotReleaseStep()

with respx.mock:
metadata_request = respx.get("http://node_1/metadata").respond(
json=ipc.MetadataResult(version="0.1", features=[feature.value for feature in node_features]).jsondict()
)
if Features.release_snapshot_files in node_features:
assert expected_request is not None
release_request = respx.post("http://node_1/release").mock(
side_effect=make_request_check(expected_request.jsondict(), "release")
)
status_request = respx.get("http://node_1/release/1").respond(
json=ipc.NodeResult(
hostname="localhost",
az="az1",
progress=Progress(handled=2, total=2, final=True),
).jsondict()
)
await release_step.run_step(cluster=single_node_cluster, context=context)
assert metadata_request.call_count == 1
if Features.release_snapshot_files in node_features:
assert release_request.call_count == 1
assert status_request.called
43 changes: 42 additions & 1 deletion tests/unit/node/test_snapshotter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from astacus.common.progress import Progress
from astacus.common.snapshot import SnapshotGroup
from astacus.node.snapshotter import Snapshotter
from astacus.node.snapshotter import hash_hexdigest_readable, Snapshotter
from pathlib import Path


Expand All @@ -21,3 +21,44 @@ def test_snapshotter_with_src_equal_dst_forgets_file_from_previous_snapshot(tmp_
file_after.write_bytes(b"y" * 1024)
snapshotter.snapshot(progress=Progress())
assert snapshotter.relative_path_to_snapshotfile.keys() == {Path("file_after")}


def assert_kept(hexdigest: str, dst_path: Path, snapshotter: Snapshotter):
assert dst_path.exists()
assert hexdigest in snapshotter.hexdigest_to_snapshotfiles
assert Path(dst_path.name) in snapshotter.relative_path_to_snapshotfile


def assert_released(hexdigest: str, dst_path: Path, snapshotter: Snapshotter):
assert not dst_path.exists()
assert hexdigest in snapshotter.hexdigest_to_snapshotfiles
assert Path(dst_path.name) in snapshotter.relative_path_to_snapshotfile


def test_snapshotter_release_hash_unlinks_files_but_keeps_metadata(tmp_path: Path) -> None:
src = tmp_path / "src"
dst = tmp_path / "dst"
src.mkdir()
dst.mkdir()
(src / "keep_this").write_text("this will be kept")
kept_digest = hash_hexdigest_readable((src / "keep_this").open(mode="rb"))
(src / "release_this").write_text("this will be released")
released_digest = hash_hexdigest_readable((src / "release_this").open(mode="rb"))
snapshotter = Snapshotter(src=src, dst=dst, groups=[SnapshotGroup(root_glob="*", embedded_file_size_max=0)], parallel=1)

with snapshotter.lock:
snapshotter.snapshot(progress=Progress())
assert_kept(kept_digest, dst / "keep_this", snapshotter)
assert_kept(released_digest, dst / "release_this", snapshotter)

snapshotter.release(released_digest)
assert_kept(kept_digest, dst / "keep_this", snapshotter)
assert_released(released_digest, dst / "release_this", snapshotter)

# re-snapshotting should restore the link
(src / "add_this").write_text("this is added for the next snapshot")
added_digest = hash_hexdigest_readable((src / "add_this").open(mode="rb"))
snapshotter.snapshot(progress=Progress())
assert_kept(kept_digest, dst / "keep_this", snapshotter)
assert_kept(released_digest, dst / "release_this", snapshotter)
assert_kept(added_digest, dst / "add_this", snapshotter)

0 comments on commit cc22e38

Please sign in to comment.