From 5adcf179d85ccb636c5fed707b6aa77dfcc7d575 Mon Sep 17 00:00:00 2001 From: Dmitry Potepalov Date: Tue, 5 Sep 2023 13:20:26 +0200 Subject: [PATCH] Restore delta backups in Cassandra When doing a partial restore, apply all the incremental changes we've uploaded when creating delta backups. After restoring the snapshot, download and put sstables from incremental backups into Cassandra's data directory. --- astacus/common/cassandra/config.py | 1 + astacus/common/ipc.py | 1 + astacus/coordinator/plugins/base.py | 138 ++++++++++++++++++ .../coordinator/plugins/cassandra/plugin.py | 22 ++- astacus/node/api.py | 14 ++ astacus/node/cassandra.py | 32 ++-- tests/unit/conftest.py | 5 + tests/unit/coordinator/plugins/test_base.py | 80 ++++++++++ tests/unit/node/test_node_cassandra.py | 9 +- 9 files changed, 290 insertions(+), 12 deletions(-) diff --git a/astacus/common/cassandra/config.py b/astacus/common/cassandra/config.py index 3f0c12fc..99d9d0cf 100644 --- a/astacus/common/cassandra/config.py +++ b/astacus/common/cassandra/config.py @@ -25,6 +25,7 @@ SNAPSHOT_NAME = "astacus-backup" SNAPSHOT_GLOB = f"data/*/*/snapshots/{SNAPSHOT_NAME}" +BACKUP_GLOB = "data/*/*/backups/" class CassandraClientConfiguration(AstacusModel): diff --git a/astacus/common/ipc.py b/astacus/common/ipc.py index cf7ea5f1..ae9b2f23 100644 --- a/astacus/common/ipc.py +++ b/astacus/common/ipc.py @@ -221,6 +221,7 @@ class CassandraRestoreSSTablesRequest(NodeRequest): table_glob: str keyspaces_to_skip: Sequence[str] match_tables_by: CassandraTableMatching + expect_empty_target: bool # coordinator.api diff --git a/astacus/coordinator/plugins/base.py b/astacus/coordinator/plugins/base.py index b2f286d7..2514219f 100644 --- a/astacus/coordinator/plugins/base.py +++ b/astacus/coordinator/plugins/base.py @@ -352,6 +352,144 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> set[str]: return set(b for b in await self.json_storage.list_jsons() if b.startswith(magic.JSON_BACKUP_PREFIX)) +@dataclasses.dataclass +class DeltaManifestsStep(Step[List[ipc.BackupManifest]]): + """ + Download and parse all delta manifests necessary for restore. + + Includes only the deltas created after the base backup selected for restore. + Returns manifests sorted by start time. + """ + + json_storage: AsyncJsonStorage + + async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.BackupManifest]: + backup_manifest = context.get_result(BackupManifestStep) + # Right now does not really matter whether it's end or start, since backup and + # delta operations are mutually exclusive. + # Theoretically we might allow uploading deltas in parallel with base backup, + # in that scenario it makes sense to rely on backup start (because a delta might + # finish uploading while the base is still being uploaded). + delta_names = sorted(d for d in await self.json_storage.list_jsons() if d.startswith(magic.JSON_DELTA_PREFIX)) + matching_delta_manifests = [] + for delta_name in delta_names: + delta_manifest = await download_backup_manifest(self.json_storage, delta_name) + if delta_manifest.start >= backup_manifest.start: + matching_delta_manifests.append(delta_manifest) + return sorted(matching_delta_manifests, key=lambda m: m.start) + + +@dataclasses.dataclass +class RestoreDeltasStep(Step[None]): + """ + Restore the delta backups: download and apply to the node. + """ + + json_storage: AsyncJsonStorage + storage_name: str + # Delta restore is plugin-dependent, allow to customize it. + restore_delta_url: str + restore_delta_request: ipc.NodeRequest + + async def run_step(self, cluster: Cluster, context: StepsContext) -> None: + deltas_to_restore = sorted(context.get_result(DeltaManifestsStep), key=lambda m: m.start) + node_to_backup_index = context.get_result(MapNodesStep) + nodes = [ + cluster.nodes[node_index] + for node_index, backup_index in enumerate(node_to_backup_index) + if backup_index is not None + ] + + for delta_manifest in deltas_to_restore: + delta_name = delta_manifest.filename + await self.download_delta( + delta_name, + nodes=nodes, + cluster=cluster, + node_to_backup_index=node_to_backup_index, + delta_manifest=delta_manifest, + ) + await self.restore_delta(delta_name, nodes=nodes, cluster=cluster) + await self.clear_delta( + delta_name, + nodes=nodes, + cluster=cluster, + node_to_backup_index=node_to_backup_index, + delta_manifest=delta_manifest, + ) + + async def download_delta( + self, + delta_name: str, + *, + nodes: List[CoordinatorNode], + cluster: Cluster, + node_to_backup_index: List[Optional[int]], + delta_manifest: ipc.BackupManifest, + ) -> None: + reqs: List[ipc.NodeRequest] = [] + for backup_index in node_to_backup_index: + if backup_index is not None: + snapshot_result = delta_manifest.snapshot_results[backup_index] + assert snapshot_result.state is not None + reqs.append( + ipc.SnapshotDownloadRequest( + storage=self.storage_name, + backup_name=delta_name, + snapshot_index=backup_index, + root_globs=snapshot_result.state.root_globs, + ) + ) + start_results = await cluster.request_from_nodes( + "delta/download", + method="post", + caller="restore_deltas", + reqs=reqs, + nodes=nodes, + ) + if not start_results: + raise StepFailedError(f"Initiating delta {delta_name} download failed") + await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) + + async def restore_delta(self, delta_name: str, *, nodes: List[CoordinatorNode], cluster: Cluster) -> None: + start_results = await cluster.request_from_nodes( + self.restore_delta_url, + method="post", + caller="restore_deltas", + req=self.restore_delta_request, + nodes=nodes, + ) + if not start_results: + raise StepFailedError(f"Initiating delta {delta_name} restore failed") + await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) + + async def clear_delta( + self, + delta_name: str, + *, + nodes: List[CoordinatorNode], + cluster: Cluster, + node_to_backup_index: List[Optional[int]], + delta_manifest: ipc.BackupManifest, + ) -> None: + reqs: List[ipc.NodeRequest] = [] + for backup_index in node_to_backup_index: + if backup_index is not None: + snapshot_result = delta_manifest.snapshot_results[backup_index] + assert snapshot_result.state is not None + reqs.append(ipc.SnapshotClearRequest(root_globs=snapshot_result.state.root_globs)) + start_results = await cluster.request_from_nodes( + "delta/clear", + method="post", + caller="restore_deltas", + reqs=reqs, + nodes=nodes, + ) + if not start_results: + raise StepFailedError(f"Initiating delta {delta_name} clear failed") + await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult) + + @dataclasses.dataclass class ComputeKeptBackupsStep(Step[set[str]]): """ diff --git a/astacus/coordinator/plugins/cassandra/plugin.py b/astacus/coordinator/plugins/cassandra/plugin.py index 1ddc01b1..87d9bdf5 100644 --- a/astacus/coordinator/plugins/cassandra/plugin.py +++ b/astacus/coordinator/plugins/cassandra/plugin.py @@ -8,7 +8,7 @@ from .utils import run_subop from astacus.common import ipc from astacus.common.cassandra.client import CassandraClient -from astacus.common.cassandra.config import CassandraClientConfiguration, SNAPSHOT_GLOB, SNAPSHOT_NAME +from astacus.common.cassandra.config import BACKUP_GLOB, CassandraClientConfiguration, SNAPSHOT_GLOB, SNAPSHOT_NAME from astacus.common.cassandra.utils import SYSTEM_KEYSPACES from astacus.common.magic import JSON_DELTA_PREFIX from astacus.common.snapshot import SnapshotGroup @@ -163,11 +163,25 @@ def get_restore_schema_from_snapshot_steps(self, *, context: OperationContext, r table_glob=SNAPSHOT_GLOB, keyspaces_to_skip=[ks for ks in SYSTEM_KEYSPACES if ks != "system_schema"], match_tables_by=ipc.CassandraTableMatching.cfid, + expect_empty_target=True, + ) + restore_deltas_req = ipc.CassandraRestoreSSTablesRequest( + table_glob=BACKUP_GLOB, + keyspaces_to_skip=[ks for ks in SYSTEM_KEYSPACES if ks != "system_schema"], + match_tables_by=ipc.CassandraTableMatching.cfid, + expect_empty_target=False, ) return [ base.RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes), CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.restore_sstables, req=restore_sstables_req), + base.DeltaManifestsStep(json_storage=context.json_storage), + base.RestoreDeltasStep( + json_storage=context.json_storage, + storage_name=context.storage_name, + restore_delta_url=f"cassandra/{ipc.CassandraSubOp.restore_sstables}", + restore_delta_request=restore_deltas_req, + ), restore_steps.StopReplacedNodesStep(partial_restore_nodes=req.partial_restore_nodes, cassandra_nodes=self.nodes), restore_steps.StartCassandraStep(replace_backup_nodes=True, override_tokens=True, cassandra_nodes=self.nodes), restore_steps.WaitCassandraUpStep(duration=self.restore_start_timeout), @@ -180,6 +194,7 @@ def get_restore_schema_from_manifest_steps(self, *, context: OperationContext, r table_glob=SNAPSHOT_GLOB, keyspaces_to_skip=SYSTEM_KEYSPACES, match_tables_by=ipc.CassandraTableMatching.cfname, + expect_empty_target=True, ) return [ # Start cassandra with backed up token distribution + set schema + stop it @@ -187,7 +202,10 @@ def get_restore_schema_from_manifest_steps(self, *, context: OperationContext, r restore_steps.WaitCassandraUpStep(duration=self.restore_start_timeout), restore_steps.RestorePreDataStep(client=client), CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.stop_cassandra), - # Restore snapshot + # Restore snapshot. Restoring deltas is not possible in this scenario, + # because once we've created our own system_schema keyspace and written data to it, + # we've started a new sequence of sstables that might clash with the sequence from the node + # we took the backup from (e.g. the old node had nb-1, the new node has nb-1, unclear how to proceed). base.RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes), CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.restore_sstables, req=restore_sstables_req), # restart cassandra and do the final actions with data available diff --git a/astacus/node/api.py b/astacus/node/api.py index 8fa5d323..4b519314 100644 --- a/astacus/node/api.py +++ b/astacus/node/api.py @@ -163,6 +163,20 @@ def download_result(*, op_id: int, n: Node = Depends()): return op.result +@router.post("/delta/download") +def delta_download(req: ipc.SnapshotDownloadRequest, n: Node = Depends()): + if not n.state.is_locked: + raise HTTPException(status_code=409, detail="Not locked") + snapshotter = delta_snapshotter_from_snapshot_req(req, n) + return DownloadOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter) + + +@router.get("/delta/download/{op_id}") +def delta_download_result(*, op_id: int, n: Node = Depends()): + op, _ = n.get_op_and_op_info(op_id=op_id, op_name=OpName.download) + return op.result + + @router.post("/clear") def clear(req: ipc.SnapshotClearRequest, n: Node = Depends()): if not n.state.is_locked: diff --git a/astacus/node/cassandra.py b/astacus/node/cassandra.py index 98f60f40..32e5643f 100644 --- a/astacus/node/cassandra.py +++ b/astacus/node/cassandra.py @@ -14,7 +14,7 @@ from astacus.common.exceptions import TransientException from pathlib import Path from pydantic import DirectoryPath -from typing import Callable +from typing import Callable, Tuple import contextlib import logging @@ -28,6 +28,16 @@ TABLES_GLOB = "data/*/*" +def ks_table_from_snapshot_path(p: Path) -> Tuple[str, str]: + # /.../keyspace/table/snapshots/astacus + return p.parts[-4], p.parts[-3] + + +def ks_table_from_backup_path(p: Path) -> Tuple[str, str]: + # /.../keyspace/table/backups + return p.parts[-3], p.parts[-2] + + class SimpleCassandraSubOp(NodeOp[ipc.NodeRequest, ipc.NodeResult]): """ Generic class to handle no arguments in + no output out case subops. @@ -145,14 +155,8 @@ def restore_sstables(self) -> None: else self._match_table_by_name(keyspace_name, table_name_and_id) ) - # Ensure destination path is empty except for potential directories (e.g. backups/) - # This should never have anything - except for system_auth, it gets populated when we restore schema. - existing_files = [file_path for file_path in table_path.glob("*") if file_path.is_file()] - if keyspace_name == "system_auth": - for existing_file in existing_files: - existing_file.unlink() - existing_files = [] - assert not existing_files, f"Files found in {table_name_and_id}: {existing_files}" + if self.req.expect_empty_target: + self._ensure_target_is_empty(keyspace_name=keyspace_name, table_path=table_path) for file_path in table_snapshot.glob("*"): file_path.rename(table_path / file_path.name) @@ -175,6 +179,16 @@ def _match_table_by_name(self, keyspace_name: str, table_name_and_id: str) -> Di return table_paths[0] + def _ensure_target_is_empty(self, *, keyspace_name: str, table_path: Path) -> None: + # Ensure destination path is empty except for potential directories (e.g. backups/) + # This should never have anything - except for system_auth, it gets populated when we restore schema. + existing_files = [file_path for file_path in table_path.glob("*") if file_path.is_file()] + if keyspace_name == "system_auth": + for existing_file in existing_files: + existing_file.unlink() + existing_files = [] + assert not existing_files, f"Files found in {table_path.name}: {existing_files}" + class CassandraStartOp(NodeOp[ipc.CassandraStartRequest, ipc.NodeResult]): def create_result(self) -> ipc.NodeResult: diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 0d8e24ea..b2d640f4 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -53,6 +53,9 @@ def __init__(self, *, mocker, tmpdir): other_table_path.mkdir(parents=True) (other_table_path / "data.file").write_text("data") + self.backup_path = keyspace_path / "dummytable-123" / "backups" + self.backup_path.mkdir(parents=True) + system_schema_path = self.root / "data" / "system_schema" / "tables-789" / "snapshots" / SNAPSHOT_NAME system_schema_path.mkdir(parents=True) (system_schema_path / "data.file").write_text("schema") @@ -71,6 +74,8 @@ def __init__(self, *, mocker, tmpdir): (keyspace_path / "dummytable-234").mkdir() (keyspace_path / "anothertable-789").mkdir() + (self.backup_path / "incremental.backup").write_text("delta") + named_temporary_file = mocker.patch.object(tempfile, "NamedTemporaryFile") self.fake_conffile = StringIO() named_temporary_file.return_value.__enter__.return_value = self.fake_conffile diff --git a/tests/unit/coordinator/plugins/test_base.py b/tests/unit/coordinator/plugins/test_base.py index 2c1034ee..b1a530e6 100644 --- a/tests/unit/coordinator/plugins/test_base.py +++ b/tests/unit/coordinator/plugins/test_base.py @@ -5,13 +5,16 @@ """ from astacus.common import ipc, utils from astacus.common.asyncstorage import AsyncJsonStorage +from astacus.common.ipc import Plugin from astacus.common.op import Op from astacus.common.progress import Progress from astacus.common.utils import now from astacus.coordinator.cluster import Cluster from astacus.coordinator.config import CoordinatorNode from astacus.coordinator.plugins.base import ( + BackupManifestStep, ComputeKeptBackupsStep, + DeltaManifestsStep, ListBackupsStep, ListHexdigestsStep, SnapshotStep, @@ -29,6 +32,7 @@ from typing import AbstractSet, List, Optional, Sequence from unittest import mock +import dataclasses import datetime import httpx import json @@ -232,3 +236,79 @@ async def test_upload_manifest_step_generates_correct_backup_name( step = UploadManifestStep(json_storage=async_json_storage, plugin=ipc.Plugin.files) await step.run_step(cluster=single_node_cluster, context=context) assert "backup-2020-01-07T05:00:00+00:00" in async_json_storage.storage.items + + +def make_manifest(start: str, end: str) -> str: + manifest = ipc.BackupManifest( + start=start, + end=end, + attempt=1, + snapshot_results=[], + upload_results=[], + plugin=Plugin.files, + ) + return manifest + + +@dataclasses.dataclass +class TestListDeltasParam: + test_id: str + basebackup_manifest: ipc.BackupManifest + stored_jsons: dict[str, str] + expected_deltas: set[str] + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "p", + [ + TestListDeltasParam( + test_id="empty_storage", + basebackup_manifest=make_manifest(start="1970-01-01T00:00", end="1970-01-01T00:30"), + stored_jsons={}, + expected_deltas=[], + ), + TestListDeltasParam( + test_id="single_delta", + basebackup_manifest=make_manifest(start="1970-01-01T00:00", end="1970-01-01T00:30"), + stored_jsons={ + "backup-base": make_manifest(start="1970-01-01T00:00", end="1970-01-01T00:30").json(), + "delta-one": make_manifest(start="1970-01-01T01:00", end="1970-01-01T01:05").json(), + }, + expected_deltas=["delta-one"], + ), + TestListDeltasParam( + test_id="deltas_older_than_backup_are_not_listed", + basebackup_manifest=make_manifest(start="2000-01-01T00:00", end="2000-01-01T00:30"), + stored_jsons={ + "backup-old": make_manifest(start="1970-01-01T00:00", end="1970-01-01T00:30").json(), + "delta-old": make_manifest(start="1970-01-01T01:00", end="1970-01-01T01:05").json(), + "backup-one": make_manifest(start="2000-01-01T00:00", end="2000-01-01T00:30").json(), + "delta-one": make_manifest(start="2000-01-01T01:00", end="2000-01-01T01:05").json(), + "delta-two": make_manifest(start="2000-01-01T12:00", end="1970-01-01T12:05").json(), + "backup-two": make_manifest(start="2000-01-02T00:00", end="2000-01-02T00:30").json(), + "delta-three": make_manifest(start="2000-01-02T12:00", end="2000-01-02T12:05").json(), + }, + expected_deltas=["delta-one", "delta-two", "delta-three"], + ), + TestListDeltasParam( + test_id="relies_on_start_time_in_case_of_intersections", + basebackup_manifest=make_manifest(start="2000-01-01T00:00", end="2000-01-01T00:30"), + stored_jsons={ + "delta-old": make_manifest(start="1999-12-31T23:59", end="2000-01-01T00:04").json(), + "backup-one": make_manifest(start="2000-01-01T00:00", end="2000-01-01T00:30").json(), + "delta-one": make_manifest(start="2000-01-01T00:05", end="2000-01-01T00:10").json(), + }, + expected_deltas=["delta-one"], + ), + ], + ids=lambda p: p.test_id, +) +async def test_list_delta_backups(p: TestListDeltasParam) -> None: + async_json_storage = AsyncJsonStorage(MemoryJsonStorage(p.stored_jsons)) + step = DeltaManifestsStep(async_json_storage) + cluster = Cluster(nodes=[CoordinatorNode(url="http://node_1")]) + context = StepsContext() + context.set_result(BackupManifestStep, p.basebackup_manifest) + backup_names = [b.filename for b in await step.run_step(cluster=cluster, context=context)] + assert backup_names == p.expected_deltas diff --git a/tests/unit/node/test_node_cassandra.py b/tests/unit/node/test_node_cassandra.py index f822e689..f32ed6af 100644 --- a/tests/unit/node/test_node_cassandra.py +++ b/tests/unit/node/test_node_cassandra.py @@ -100,7 +100,7 @@ def test_api_cassandra_subop(app, ctenv, mocker, subop): assert (ctenv.root / "data").exists() assert (ctenv.root / "data" / "dummyks").exists() assert (ctenv.root / "data" / "dummyks" / "dummytable-123").exists() - assert [p.name for p in (ctenv.root / "data" / "dummyks" / "dummytable-123").iterdir()] == ["snapshots"] + assert [p.name for p in (ctenv.root / "data" / "dummyks" / "dummytable-123").iterdir()] == ["backups", "snapshots"] assert not (ctenv.root / "data" / "dummyks" / "dummytable-234").exists() elif subop == ipc.CassandraSubOp.start_cassandra: subprocess_run.assert_any_call(ctenv.cassandra_node_config.start_command + ["tempfilename"], check=True) @@ -154,6 +154,7 @@ class DefaultedRestoreSSTablesRequest(ipc.CassandraRestoreSSTablesRequest): table_glob: str = astacus_node_cassandra.SNAPSHOT_GLOB keyspaces_to_skip: Sequence[str] = list(SYSTEM_KEYSPACES) match_tables_by: ipc.CassandraTableMatching = ipc.CassandraTableMatching.cfname + expect_empty_target: bool = True return DefaultedRestoreSSTablesRequest @@ -183,6 +184,12 @@ def test_matches_tables_by_id_when_told_to(self, ctenv, make_sstables_request) - assert (ctenv.root / "data" / "dummyks" / "dummytable-123" / "asdf").read_text() == "foobar" assert not (ctenv.root / "data" / "dummyks" / "dummytable-234" / "asdf").exists() + def test_allows_existing_files_when_told_to(self, ctenv, make_sstables_request) -> None: + req = make_sstables_request(expect_empty_target=False) + (ctenv.root / "data" / "dummyks" / "dummytable-123" / "existing_file").write_text("exists") + + self.assert_request_succeeded(ctenv, req) + def assert_request_succeeded(self, ctenv, req: ipc.CassandraRestoreSSTablesRequest) -> None: response = ctenv.post(subop=ipc.CassandraSubOp.restore_sstables, json=req.dict()) status = ctenv.get_status(response)