Skip to content

Commit

Permalink
clickhouse: backup SQL user defined functions
Browse files Browse the repository at this point in the history
[DDB-1034]
  • Loading branch information
joelynch committed Jun 27, 2024
1 parent 53ece16 commit 040313c
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 26 deletions.
1 change: 1 addition & 0 deletions astacus/coordinator/plugins/clickhouse/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
},
"replicated_access_zookeeper_path": "/clickhouse/access",
"replicated_databases_zookeeper_path": "/clickhouse/databases",
"replicated_user_defined_zookeeper_path": "/clickhouse/user_defined_functions",
"replicated_databases_settings": {
"max_broken_tables_ratio": 0.5,
"max_replication_lag_to_enqueue": 10,
Expand Down
42 changes: 29 additions & 13 deletions astacus/coordinator/plugins/clickhouse/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
Copyright (c) 2021 Aiven Ltd
See LICENSE for details
"""

from astacus.common.utils import AstacusModel
from astacus.coordinator.plugins.clickhouse.client import escape_sql_identifier
from base64 import b64decode, b64encode
from collections.abc import Mapping, Sequence
from typing import Any
from typing import Any, Self
from uuid import UUID

import enum
Expand All @@ -25,15 +26,26 @@ class AccessEntity(AstacusModel):
attach_query: bytes

@classmethod
def from_plugin_data(cls, data: Mapping[str, Any]) -> "AccessEntity":
return AccessEntity(
def from_plugin_data(cls, data: Mapping[str, Any]) -> Self:
return cls(
type=data["type"],
uuid=UUID(hex=data["uuid"]),
name=b64decode(data["name"]),
attach_query=b64decode(data["attach_query"]),
)


class UserDefinedFunction(AstacusModel):
"""SQL UserDefinedFunction, stored in Zookeeper."""

path: str
create_query: bytes

@classmethod
def from_plugin_data(cls, data: Mapping[str, Any]) -> Self:
return cls(path=data["path"], create_query=b64decode(data["create_query"]))


class ReplicatedDatabase(AstacusModel):
name: bytes
# This is optional because of older backups without uuids
Expand All @@ -43,8 +55,8 @@ class ReplicatedDatabase(AstacusModel):
replica: bytes

@classmethod
def from_plugin_data(cls, data: Mapping[str, Any]) -> "ReplicatedDatabase":
return ReplicatedDatabase(
def from_plugin_data(cls, data: Mapping[str, Any]) -> Self:
return cls(
name=b64decode(data["name"]),
uuid=uuid.UUID(data["uuid"]) if "uuid" in data else None,
shard=b64decode(data["shard"]) if "shard" in data else b"{shard}",
Expand Down Expand Up @@ -75,11 +87,11 @@ def escaped_sql_identifier(self) -> str:
return f"{escape_sql_identifier(self.database)}.{escape_sql_identifier(self.name)}"

@classmethod
def from_plugin_data(cls, data: Mapping[str, Any]) -> "Table":
def from_plugin_data(cls, data: Mapping[str, Any]) -> Self:
dependencies = [
(b64decode(database_name), b64decode(table_name)) for database_name, table_name in data["dependencies"]
]
return Table(
return cls(
database=b64decode(data["database"]),
name=b64decode(data["name"]),
engine=data["engine"],
Expand All @@ -99,17 +111,17 @@ class ClickHouseObjectStorageFile(AstacusModel):
path: str

@classmethod
def from_plugin_data(cls, data: dict[str, Any]) -> "ClickHouseObjectStorageFile":
return ClickHouseObjectStorageFile(path=data["path"])
def from_plugin_data(cls, data: dict[str, Any]) -> Self:
return cls(path=data["path"])


class ClickHouseObjectStorageFiles(AstacusModel):
disk_name: str
files: list[ClickHouseObjectStorageFile]

@classmethod
def from_plugin_data(cls, data: dict[str, Any]) -> "ClickHouseObjectStorageFiles":
return ClickHouseObjectStorageFiles(
def from_plugin_data(cls, data: dict[str, Any]) -> Self:
return cls(
disk_name=data["disk_name"],
files=[ClickHouseObjectStorageFile.from_plugin_data(item) for item in data["files"]],
)
Expand All @@ -121,6 +133,7 @@ class Config:

version: ClickHouseBackupVersion
access_entities: Sequence[AccessEntity] = []
user_defined_functions: Sequence[UserDefinedFunction] = []
replicated_databases: Sequence[ReplicatedDatabase] = []
tables: Sequence[Table] = []
object_storage_files: list[ClickHouseObjectStorageFiles] = []
Expand All @@ -129,10 +142,13 @@ def to_plugin_data(self) -> dict[str, Any]:
return encode_manifest_data(self.dict())

@classmethod
def from_plugin_data(cls, data: Mapping[str, Any]) -> "ClickHouseManifest":
return ClickHouseManifest(
def from_plugin_data(cls, data: Mapping[str, Any]) -> Self:
return cls(
version=ClickHouseBackupVersion(data.get("version", ClickHouseBackupVersion.V1.value)),
access_entities=[AccessEntity.from_plugin_data(item) for item in data["access_entities"]],
user_defined_functions=[
UserDefinedFunction.from_plugin_data(item) for item in data.get("user_defined_functions", [])
],
replicated_databases=[ReplicatedDatabase.from_plugin_data(item) for item in data["replicated_databases"]],
tables=[Table.from_plugin_data(item) for item in data["tables"]],
object_storage_files=[
Expand Down
14 changes: 14 additions & 0 deletions astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
RestoreObjectStorageFilesStep,
RestoreReplicaStep,
RestoreReplicatedDatabasesStep,
RestoreUserDefinedFunctionsStep,
RetrieveAccessEntitiesStep,
RetrieveDatabasesAndTablesStep,
RetrieveMacrosStep,
RetrieveUserDefinedFunctionsStep,
SyncDatabaseReplicasStep,
SyncTableReplicasStep,
UnfreezeTablesStep,
Expand Down Expand Up @@ -64,6 +66,7 @@ class ClickHousePlugin(CoordinatorPlugin):
clickhouse: ClickHouseConfiguration = ClickHouseConfiguration()
replicated_access_zookeeper_path: str = "/clickhouse/access"
replicated_databases_zookeeper_path: str = "/clickhouse/databases"
replicated_user_defined_zookeeper_path: str | None = None
replicated_databases_settings: ReplicatedDatabaseSettings = ReplicatedDatabaseSettings()
freeze_name: str = "astacus"
disks: Sequence[DiskConfiguration] = [DiskConfiguration(type=DiskType.local, path=Path(""), name="default")]
Expand All @@ -76,6 +79,7 @@ class ClickHousePlugin(CoordinatorPlugin):
max_concurrent_create_databases: int = 10
max_concurrent_create_databases_per_node: int = 10
sync_databases_timeout: float = 60.0
sync_user_defined_functions_timeout: float = 60.0
restart_replica_timeout: float = 300.0
# Deprecated parameter, ignored
max_concurrent_restart_replica: int = 10
Expand Down Expand Up @@ -115,6 +119,10 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
zookeeper_client=zookeeper_client,
access_entities_path=self.replicated_access_zookeeper_path,
),
RetrieveUserDefinedFunctionsStep(
zookeeper_client=zookeeper_client,
replicated_user_defined_zookeeper_path=self.replicated_user_defined_zookeeper_path,
),
RetrieveDatabasesAndTablesStep(clients=clickhouse_clients),
RetrieveMacrosStep(clients=clickhouse_clients),
# Then freeze all tables
Expand Down Expand Up @@ -202,6 +210,12 @@ def get_restore_steps(self, *, context: OperationContext, req: RestoreRequest) -
sync_timeout=self.sync_tables_timeout,
max_concurrent_sync_per_node=self.max_concurrent_sync_per_node,
),
RestoreUserDefinedFunctionsStep(
zookeeper_client=zookeeper_client,
replicated_user_defined_zookeeper_path=self.replicated_user_defined_zookeeper_path,
clients=clients,
sync_user_defined_functions_timeout=self.sync_user_defined_functions_timeout,
),
# Keeping this step last avoids access from non-admin users while we are still restoring
RestoreAccessEntitiesStep(
zookeeper_client=zookeeper_client, access_entities_path=self.replicated_access_zookeeper_path
Expand Down
86 changes: 85 additions & 1 deletion astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ClickHouseObjectStorageFiles,
ReplicatedDatabase,
Table,
UserDefinedFunction,
)
from .parts import list_parts_to_attach
from .replication import DatabaseReplica, get_databases_replicas, get_shard_and_replica, sync_replicated_database
Expand All @@ -38,7 +39,7 @@
StepsContext,
SyncStep,
)
from astacus.coordinator.plugins.zookeeper import ChangeWatch, TransactionError, ZooKeeperClient
from astacus.coordinator.plugins.zookeeper import ChangeWatch, NoNodeError, TransactionError, ZooKeeperClient
from base64 import b64decode
from collections.abc import Awaitable, Callable, Iterable, Iterator, Mapping, Sequence
from typing import Any, cast, TypeVar
Expand All @@ -48,8 +49,10 @@
import dataclasses
import logging
import msgspec
import os
import re
import secrets
import time
import uuid

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -151,6 +154,33 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ac
return access_entities


@dataclasses.dataclass
class RetrieveUserDefinedFunctionsStep(Step[Sequence[UserDefinedFunction]]):
zookeeper_client: ZooKeeperClient
replicated_user_defined_zookeeper_path: str | None

async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[UserDefinedFunction]:
if self.replicated_user_defined_zookeeper_path is None:
return []

user_defined_functions: list[UserDefinedFunction] = []
async with self.zookeeper_client.connect() as connection:
change_watch = ChangeWatch()
try:
children = await connection.get_children(self.replicated_user_defined_zookeeper_path, watch=change_watch)
except NoNodeError:
# The path doesn't exist, no user defined functions to restore
return []

for child in children:
user_defined_function_path = os.path.join(self.replicated_user_defined_zookeeper_path, child)
user_defined_function_value = await connection.get(user_defined_function_path, watch=change_watch)
user_defined_functions.append(UserDefinedFunction(path=child, create_query=user_defined_function_value))
if change_watch.has_changed:
raise TransientException("Concurrent modification during user_defined_function entities retrieval")
return user_defined_functions


@dataclasses.dataclass
class RetrieveDatabasesAndTablesStep(Step[DatabasesAndTables]):
"""
Expand Down Expand Up @@ -273,9 +303,11 @@ class PrepareClickHouseManifestStep(Step[dict[str, Any]]):

async def run_step(self, cluster: Cluster, context: StepsContext) -> dict[str, Any]:
databases, tables = context.get_result(RetrieveDatabasesAndTablesStep)
user_defined_functions = context.get_result(RetrieveUserDefinedFunctionsStep)
manifest = ClickHouseManifest(
version=ClickHouseBackupVersion.V2,
access_entities=context.get_result(RetrieveAccessEntitiesStep),
user_defined_functions=user_defined_functions,
replicated_databases=databases,
tables=tables,
object_storage_files=context.get_result(CollectObjectStorageFilesStep),
Expand Down Expand Up @@ -435,6 +467,25 @@ async def run_on_every_node(
await asyncio.gather(*[gather_limited(per_node_concurrency_limit, fn(client)) for client in clients])


async def wait_for_condition_on_every_node(
clients: Iterable[ClickHouseClient],
condition: Callable[[ClickHouseClient], Awaitable[bool]],
description: str,
timeout_seconds: float,
recheck_every_seconds: float = 1.0,
) -> None:
async def wait_for_condition(client: ClickHouseClient) -> None:
start_time = time.monotonic()
while True:
if await condition(client):
return
if time.monotonic() - start_time > timeout_seconds:
raise StepFailedError(f"Timeout while waiting for {description}")
await asyncio.sleep(recheck_every_seconds)

await asyncio.gather(*(wait_for_condition(client) for client in clients))


def get_restore_table_query(table: Table) -> bytes:
# Use `ATTACH` instead of `CREATE` for materialized views for
# proper restore in case of `SELECT` table absence
Expand Down Expand Up @@ -627,6 +678,39 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
pass


@dataclasses.dataclass
class RestoreUserDefinedFunctionsStep(Step[None]):
zookeeper_client: ZooKeeperClient
replicated_user_defined_zookeeper_path: str | None
clients: Sequence[ClickHouseClient]
sync_user_defined_functions_timeout: float

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
if self.replicated_user_defined_zookeeper_path is None:
return

clickhouse_manifest = context.get_result(ClickHouseManifestStep)
if not clickhouse_manifest.user_defined_functions:
return

async with self.zookeeper_client.connect() as connection:
for user_defined_function in clickhouse_manifest.user_defined_functions:
path = os.path.join(self.replicated_user_defined_zookeeper_path, user_defined_function.path)
await connection.try_create(path, user_defined_function.create_query)

async def check_function_count(client: ClickHouseClient) -> bool:
count = await client.execute(b"""SELECT count(*) FROM system.functions WHERE origin = 'SQLUserDefined'""")
assert isinstance(count[0][0], str)
return int(count[0][0]) >= len(clickhouse_manifest.user_defined_functions)

await wait_for_condition_on_every_node(
clients=self.clients,
condition=check_function_count,
description="user defined functions to be restored",
timeout_seconds=self.sync_user_defined_functions_timeout,
)


@dataclasses.dataclass
class RestoreReplicaStep(Step[None]):
"""
Expand Down
5 changes: 3 additions & 2 deletions astacus/coordinator/plugins/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Copyright (c) 2021 Aiven Ltd
See LICENSE for details
"""

from astacus.common.exceptions import TransientException
from asyncio import to_thread
from collections.abc import AsyncIterator, Callable, Mapping, Sequence
Expand Down Expand Up @@ -72,9 +73,9 @@ async def try_create(self, path: str, value: bytes) -> bool:
Auto-creates all parent nodes if they don't exist.
Does nothing if the node did not already exist.
Does nothing if the node already exists.
Returns `True` if the node was created
Returns `True` if the node was created.
"""
try:
await self.create(path, value)
Expand Down
19 changes: 12 additions & 7 deletions tests/integration/coordinator/plugins/clickhouse/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Copyright (c) 2021 Aiven Ltd
See LICENSE for details
"""

from _pytest.fixtures import FixtureRequest
from astacus.client import create_client_parsers
from astacus.common.ipc import Plugin
Expand Down Expand Up @@ -413,6 +414,7 @@ def setting(name: str, value: int | float | str):
<http_port>{http_port}</http_port>
<interserver_http_host>localhost</interserver_http_host>
<interserver_http_port>{interserver_http_port}</interserver_http_port>
<user_defined_zookeeper_path>/clickhouse/user_defined_functions/</user_defined_zookeeper_path>
<zookeeper>
<node>
<host>{zookeeper.host}</host>
Expand Down Expand Up @@ -573,14 +575,17 @@ def create_astacus_configs(
for service in clickhouse_cluster.services
],
),
replicated_databases_settings=ReplicatedDatabaseSettings(
collection_name="default_cluster",
)
if clickhouse_cluster.use_named_collections
else ReplicatedDatabaseSettings(
cluster_username=clickhouse_cluster.services[0].username,
cluster_password=clickhouse_cluster.services[0].password,
replicated_databases_settings=(
ReplicatedDatabaseSettings(
collection_name="default_cluster",
)
if clickhouse_cluster.use_named_collections
else ReplicatedDatabaseSettings(
cluster_username=clickhouse_cluster.services[0].username,
cluster_password=clickhouse_cluster.services[0].password,
)
),
replicated_user_defined_zookeeper_path="/clickhouse/user_defined_functions/",
disks=[
DiskConfiguration(
type=DiskType.local,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam
await clients[2].execute(b"INSERT INTO default.in_object_storage VALUES (789, 'baz')")
# This won't be backed up
await clients[0].execute(b"INSERT INTO default.memory VALUES (123, 'foo')")
await clients[0].execute(b"CREATE FUNCTION `linear_equation_\x80` AS (x, k, b) -> k*x + b")


async def setup_cluster_users(clients: Sequence[HttpClickHouseClient]) -> None:
Expand Down Expand Up @@ -556,3 +557,11 @@ async def test_restores_dictionaries(restored_cluster: Sequence[ClickHouseClient
for client, expected in zip(restored_cluster, cluster_data):
await client.execute(b"SYSTEM RELOAD DICTIONARY default.dictionary")
assert await client.execute(b"SELECT * FROM default.dictionary") == expected


async def test_restores_user_defined_functions(restored_cluster: Sequence[ClickHouseClient]) -> None:
for client in restored_cluster:
functions = await client.execute(b"SELECT base64Encode(name) FROM system.functions WHERE origin = 'SQLUserDefined'")
assert functions == [[_b64_str(b"linear_equation_\x80")]]
result = await client.execute(b"SELECT `linear_equation_\x80`(1, 2, 3)")
assert result == [[5]]
Loading

0 comments on commit 040313c

Please sign in to comment.