Skip to content

Commit

Permalink
Restore delta backups in Cassandra
Browse files Browse the repository at this point in the history
When doing a partial restore, apply all the incremental changes we've
uploaded when creating delta backups. After restoring the snapshot,
download and put sstables from incremental backups into Cassandra's data
directory.
  • Loading branch information
dmitry-potepalov committed Oct 17, 2023
1 parent fef6c5a commit 5adcf17
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 12 deletions.
1 change: 1 addition & 0 deletions astacus/common/cassandra/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

SNAPSHOT_NAME = "astacus-backup"
SNAPSHOT_GLOB = f"data/*/*/snapshots/{SNAPSHOT_NAME}"
BACKUP_GLOB = "data/*/*/backups/"


class CassandraClientConfiguration(AstacusModel):
Expand Down
1 change: 1 addition & 0 deletions astacus/common/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class CassandraRestoreSSTablesRequest(NodeRequest):
table_glob: str
keyspaces_to_skip: Sequence[str]
match_tables_by: CassandraTableMatching
expect_empty_target: bool


# coordinator.api
Expand Down
138 changes: 138 additions & 0 deletions astacus/coordinator/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,144 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> set[str]:
return set(b for b in await self.json_storage.list_jsons() if b.startswith(magic.JSON_BACKUP_PREFIX))


@dataclasses.dataclass
class DeltaManifestsStep(Step[List[ipc.BackupManifest]]):
"""
Download and parse all delta manifests necessary for restore.
Includes only the deltas created after the base backup selected for restore.
Returns manifests sorted by start time.
"""

json_storage: AsyncJsonStorage

async def run_step(self, cluster: Cluster, context: StepsContext) -> List[ipc.BackupManifest]:
backup_manifest = context.get_result(BackupManifestStep)
# Right now does not really matter whether it's end or start, since backup and
# delta operations are mutually exclusive.
# Theoretically we might allow uploading deltas in parallel with base backup,
# in that scenario it makes sense to rely on backup start (because a delta might
# finish uploading while the base is still being uploaded).
delta_names = sorted(d for d in await self.json_storage.list_jsons() if d.startswith(magic.JSON_DELTA_PREFIX))
matching_delta_manifests = []
for delta_name in delta_names:
delta_manifest = await download_backup_manifest(self.json_storage, delta_name)
if delta_manifest.start >= backup_manifest.start:
matching_delta_manifests.append(delta_manifest)
return sorted(matching_delta_manifests, key=lambda m: m.start)


@dataclasses.dataclass
class RestoreDeltasStep(Step[None]):
"""
Restore the delta backups: download and apply to the node.
"""

json_storage: AsyncJsonStorage
storage_name: str
# Delta restore is plugin-dependent, allow to customize it.
restore_delta_url: str
restore_delta_request: ipc.NodeRequest

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
deltas_to_restore = sorted(context.get_result(DeltaManifestsStep), key=lambda m: m.start)
node_to_backup_index = context.get_result(MapNodesStep)
nodes = [
cluster.nodes[node_index]
for node_index, backup_index in enumerate(node_to_backup_index)
if backup_index is not None
]

for delta_manifest in deltas_to_restore:
delta_name = delta_manifest.filename
await self.download_delta(
delta_name,
nodes=nodes,
cluster=cluster,
node_to_backup_index=node_to_backup_index,
delta_manifest=delta_manifest,
)
await self.restore_delta(delta_name, nodes=nodes, cluster=cluster)
await self.clear_delta(
delta_name,
nodes=nodes,
cluster=cluster,
node_to_backup_index=node_to_backup_index,
delta_manifest=delta_manifest,
)

async def download_delta(
self,
delta_name: str,
*,
nodes: List[CoordinatorNode],
cluster: Cluster,
node_to_backup_index: List[Optional[int]],
delta_manifest: ipc.BackupManifest,
) -> None:
reqs: List[ipc.NodeRequest] = []
for backup_index in node_to_backup_index:
if backup_index is not None:
snapshot_result = delta_manifest.snapshot_results[backup_index]
assert snapshot_result.state is not None
reqs.append(
ipc.SnapshotDownloadRequest(
storage=self.storage_name,
backup_name=delta_name,
snapshot_index=backup_index,
root_globs=snapshot_result.state.root_globs,
)
)
start_results = await cluster.request_from_nodes(
"delta/download",
method="post",
caller="restore_deltas",
reqs=reqs,
nodes=nodes,
)
if not start_results:
raise StepFailedError(f"Initiating delta {delta_name} download failed")
await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)

async def restore_delta(self, delta_name: str, *, nodes: List[CoordinatorNode], cluster: Cluster) -> None:
start_results = await cluster.request_from_nodes(
self.restore_delta_url,
method="post",
caller="restore_deltas",
req=self.restore_delta_request,
nodes=nodes,
)
if not start_results:
raise StepFailedError(f"Initiating delta {delta_name} restore failed")
await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)

async def clear_delta(
self,
delta_name: str,
*,
nodes: List[CoordinatorNode],
cluster: Cluster,
node_to_backup_index: List[Optional[int]],
delta_manifest: ipc.BackupManifest,
) -> None:
reqs: List[ipc.NodeRequest] = []
for backup_index in node_to_backup_index:
if backup_index is not None:
snapshot_result = delta_manifest.snapshot_results[backup_index]
assert snapshot_result.state is not None
reqs.append(ipc.SnapshotClearRequest(root_globs=snapshot_result.state.root_globs))
start_results = await cluster.request_from_nodes(
"delta/clear",
method="post",
caller="restore_deltas",
reqs=reqs,
nodes=nodes,
)
if not start_results:
raise StepFailedError(f"Initiating delta {delta_name} clear failed")
await cluster.wait_successful_results(start_results=start_results, result_class=ipc.NodeResult)


@dataclasses.dataclass
class ComputeKeptBackupsStep(Step[set[str]]):
"""
Expand Down
22 changes: 20 additions & 2 deletions astacus/coordinator/plugins/cassandra/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .utils import run_subop
from astacus.common import ipc
from astacus.common.cassandra.client import CassandraClient
from astacus.common.cassandra.config import CassandraClientConfiguration, SNAPSHOT_GLOB, SNAPSHOT_NAME
from astacus.common.cassandra.config import BACKUP_GLOB, CassandraClientConfiguration, SNAPSHOT_GLOB, SNAPSHOT_NAME
from astacus.common.cassandra.utils import SYSTEM_KEYSPACES
from astacus.common.magic import JSON_DELTA_PREFIX
from astacus.common.snapshot import SnapshotGroup
Expand Down Expand Up @@ -163,11 +163,25 @@ def get_restore_schema_from_snapshot_steps(self, *, context: OperationContext, r
table_glob=SNAPSHOT_GLOB,
keyspaces_to_skip=[ks for ks in SYSTEM_KEYSPACES if ks != "system_schema"],
match_tables_by=ipc.CassandraTableMatching.cfid,
expect_empty_target=True,
)
restore_deltas_req = ipc.CassandraRestoreSSTablesRequest(
table_glob=BACKUP_GLOB,
keyspaces_to_skip=[ks for ks in SYSTEM_KEYSPACES if ks != "system_schema"],
match_tables_by=ipc.CassandraTableMatching.cfid,
expect_empty_target=False,
)

return [
base.RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes),
CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.restore_sstables, req=restore_sstables_req),
base.DeltaManifestsStep(json_storage=context.json_storage),
base.RestoreDeltasStep(
json_storage=context.json_storage,
storage_name=context.storage_name,
restore_delta_url=f"cassandra/{ipc.CassandraSubOp.restore_sstables}",
restore_delta_request=restore_deltas_req,
),
restore_steps.StopReplacedNodesStep(partial_restore_nodes=req.partial_restore_nodes, cassandra_nodes=self.nodes),
restore_steps.StartCassandraStep(replace_backup_nodes=True, override_tokens=True, cassandra_nodes=self.nodes),
restore_steps.WaitCassandraUpStep(duration=self.restore_start_timeout),
Expand All @@ -180,14 +194,18 @@ def get_restore_schema_from_manifest_steps(self, *, context: OperationContext, r
table_glob=SNAPSHOT_GLOB,
keyspaces_to_skip=SYSTEM_KEYSPACES,
match_tables_by=ipc.CassandraTableMatching.cfname,
expect_empty_target=True,
)
return [
# Start cassandra with backed up token distribution + set schema + stop it
restore_steps.StartCassandraStep(override_tokens=True, cassandra_nodes=self.nodes),
restore_steps.WaitCassandraUpStep(duration=self.restore_start_timeout),
restore_steps.RestorePreDataStep(client=client),
CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.stop_cassandra),
# Restore snapshot
# Restore snapshot. Restoring deltas is not possible in this scenario,
# because once we've created our own system_schema keyspace and written data to it,
# we've started a new sequence of sstables that might clash with the sequence from the node
# we took the backup from (e.g. the old node had nb-1, the new node has nb-1, unclear how to proceed).
base.RestoreStep(storage_name=context.storage_name, partial_restore_nodes=req.partial_restore_nodes),
CassandraRestoreSubOpStep(op=ipc.CassandraSubOp.restore_sstables, req=restore_sstables_req),
# restart cassandra and do the final actions with data available
Expand Down
14 changes: 14 additions & 0 deletions astacus/node/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,20 @@ def download_result(*, op_id: int, n: Node = Depends()):
return op.result


@router.post("/delta/download")
def delta_download(req: ipc.SnapshotDownloadRequest, n: Node = Depends()):
if not n.state.is_locked:
raise HTTPException(status_code=409, detail="Not locked")
snapshotter = delta_snapshotter_from_snapshot_req(req, n)
return DownloadOp(n=n, op_id=n.allocate_op_id(), stats=n.stats, req=req).start(snapshotter)


@router.get("/delta/download/{op_id}")
def delta_download_result(*, op_id: int, n: Node = Depends()):
op, _ = n.get_op_and_op_info(op_id=op_id, op_name=OpName.download)
return op.result


@router.post("/clear")
def clear(req: ipc.SnapshotClearRequest, n: Node = Depends()):
if not n.state.is_locked:
Expand Down
32 changes: 23 additions & 9 deletions astacus/node/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from astacus.common.exceptions import TransientException
from pathlib import Path
from pydantic import DirectoryPath
from typing import Callable
from typing import Callable, Tuple

import contextlib
import logging
Expand All @@ -28,6 +28,16 @@
TABLES_GLOB = "data/*/*"


def ks_table_from_snapshot_path(p: Path) -> Tuple[str, str]:
# /.../keyspace/table/snapshots/astacus
return p.parts[-4], p.parts[-3]


def ks_table_from_backup_path(p: Path) -> Tuple[str, str]:
# /.../keyspace/table/backups
return p.parts[-3], p.parts[-2]


class SimpleCassandraSubOp(NodeOp[ipc.NodeRequest, ipc.NodeResult]):
"""
Generic class to handle no arguments in + no output out case subops.
Expand Down Expand Up @@ -145,14 +155,8 @@ def restore_sstables(self) -> None:
else self._match_table_by_name(keyspace_name, table_name_and_id)
)

# Ensure destination path is empty except for potential directories (e.g. backups/)
# This should never have anything - except for system_auth, it gets populated when we restore schema.
existing_files = [file_path for file_path in table_path.glob("*") if file_path.is_file()]
if keyspace_name == "system_auth":
for existing_file in existing_files:
existing_file.unlink()
existing_files = []
assert not existing_files, f"Files found in {table_name_and_id}: {existing_files}"
if self.req.expect_empty_target:
self._ensure_target_is_empty(keyspace_name=keyspace_name, table_path=table_path)

for file_path in table_snapshot.glob("*"):
file_path.rename(table_path / file_path.name)
Expand All @@ -175,6 +179,16 @@ def _match_table_by_name(self, keyspace_name: str, table_name_and_id: str) -> Di

return table_paths[0]

def _ensure_target_is_empty(self, *, keyspace_name: str, table_path: Path) -> None:
# Ensure destination path is empty except for potential directories (e.g. backups/)
# This should never have anything - except for system_auth, it gets populated when we restore schema.
existing_files = [file_path for file_path in table_path.glob("*") if file_path.is_file()]
if keyspace_name == "system_auth":
for existing_file in existing_files:
existing_file.unlink()
existing_files = []
assert not existing_files, f"Files found in {table_path.name}: {existing_files}"


class CassandraStartOp(NodeOp[ipc.CassandraStartRequest, ipc.NodeResult]):
def create_result(self) -> ipc.NodeResult:
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def __init__(self, *, mocker, tmpdir):
other_table_path.mkdir(parents=True)
(other_table_path / "data.file").write_text("data")

self.backup_path = keyspace_path / "dummytable-123" / "backups"
self.backup_path.mkdir(parents=True)

system_schema_path = self.root / "data" / "system_schema" / "tables-789" / "snapshots" / SNAPSHOT_NAME
system_schema_path.mkdir(parents=True)
(system_schema_path / "data.file").write_text("schema")
Expand All @@ -71,6 +74,8 @@ def __init__(self, *, mocker, tmpdir):
(keyspace_path / "dummytable-234").mkdir()
(keyspace_path / "anothertable-789").mkdir()

(self.backup_path / "incremental.backup").write_text("delta")

named_temporary_file = mocker.patch.object(tempfile, "NamedTemporaryFile")
self.fake_conffile = StringIO()
named_temporary_file.return_value.__enter__.return_value = self.fake_conffile
Expand Down
Loading

0 comments on commit 5adcf17

Please sign in to comment.