From c91dac70cd83954d31ad34ebb9feb9268c3dcfe4 Mon Sep 17 00:00:00 2001 From: Aris Tritas Date: Thu, 24 Oct 2024 10:55:36 +0200 Subject: [PATCH] ClickHouse: pause KeeperMap insertion momentarily during backup The PR alters replicated users' privileges (through GRANT & REVOKE) during the backup. This ensures the KeeperMap tables' snapshot is consistent with the frozen table data. [DDB-1237] --- .../coordinator/plugins/clickhouse/plugin.py | 3 + .../coordinator/plugins/clickhouse/steps.py | 30 +++++++ astacus/coordinator/plugins/zookeeper.py | 11 +++ tests/integration/conftest.py | 36 ++++++-- tests/integration/constants.py | 4 + .../plugins/clickhouse/conftest.py | 2 +- .../plugins/clickhouse/test_steps.py | 82 ++++++++++++++++++- .../plugins/clickhouse/test_steps.py | 27 ++++++ 8 files changed, 185 insertions(+), 10 deletions(-) create mode 100644 tests/integration/constants.py diff --git a/astacus/coordinator/plugins/clickhouse/plugin.py b/astacus/coordinator/plugins/clickhouse/plugin.py index 49ce83d2..feccc228 100644 --- a/astacus/coordinator/plugins/clickhouse/plugin.py +++ b/astacus/coordinator/plugins/clickhouse/plugin.py @@ -19,6 +19,7 @@ DeleteDanglingObjectStorageFilesStep, FreezeTablesStep, GetVersionsStep, + KeeperMapTablesReadOnlyStep, ListDatabaseReplicasStep, MoveFrozenPartsStep, PrepareClickHouseManifestStep, @@ -129,6 +130,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: ), RetrieveDatabasesAndTablesStep(clients=clickhouse_clients), RetrieveMacrosStep(clients=clickhouse_clients), + KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False), RetrieveKeeperMapTableDataStep( zookeeper_client=zookeeper_client, keeper_map_path_prefix=self.keeper_map_path_prefix, @@ -137,6 +139,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: FreezeTablesStep( clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout ), + KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True), # Then snapshot and backup all frozen table parts SnapshotStep( snapshot_groups=disks.get_snapshot_groups(self.freeze_name), diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index b344aedc..a9bd4abb 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -179,6 +179,36 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Us return user_defined_functions +@dataclasses.dataclass +class KeeperMapTablesReadOnlyStep(Step[None]): + clients: Sequence[ClickHouseClient] + allow_writes: bool + + @staticmethod + def get_revoke_statement(table: Table, escaped_user_name: str) -> bytes: + return f"REVOKE INSERT, UPDATE, DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}".encode() + + @staticmethod + def get_grant_statement(table: Table, escaped_user_name: str) -> bytes: + return f"GRANT INSERT, UPDATE, DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}".encode() + + async def run_step(self, cluster: Cluster, context: StepsContext): + _, tables = context.get_result(RetrieveDatabasesAndTablesStep) + replicated_users_response = await self.clients[0].execute( + b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name" + ) + replicated_users_names = [b64decode(cast(str, user[0])) for user in replicated_users_response] + keeper_map_table_names = [table for table in tables if table.engine == "KeeperMap"] + privilege_altering_fun = self.get_grant_statement if self.allow_writes else self.get_revoke_statement + statements = [ + privilege_altering_fun(table, escape_sql_identifier(user)) + for table in keeper_map_table_names + for user in replicated_users_names + ] + logger.info("Executing %d statements on KeeperMap tables\n%s", len(statements), "\n".join(map(str, statements))) + await asyncio.gather(*(self.clients[0].execute(statement) for statement in statements)) + + @dataclasses.dataclass class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]): zookeeper_client: ZooKeeperClient diff --git a/astacus/coordinator/plugins/zookeeper.py b/astacus/coordinator/plugins/zookeeper.py index d335a864..aa61f813 100644 --- a/astacus/coordinator/plugins/zookeeper.py +++ b/astacus/coordinator/plugins/zookeeper.py @@ -10,6 +10,7 @@ from kazoo.protocol.states import KazooState from kazoo.recipe.watchers import ChildrenWatch, DataWatch from kazoo.retry import KazooRetry +from kazoo.security import ACL from queue import Empty, Queue import asyncio @@ -122,6 +123,10 @@ async def exists(self, path: str) -> bool: """Check if specified node exists.""" raise NotImplementedError + async def set_acls(self, path: str, acls: Sequence[ACL]) -> None: + """Set ACLs for node.""" + raise NotImplementedError + def transaction(self) -> ZooKeeperTransaction: """Begin a transaction.""" raise NotImplementedError @@ -292,6 +297,12 @@ async def exists(self, path) -> bool: maybe_znode_stat = await to_thread(self.client.retry, self.client.exists, path) return maybe_znode_stat is not None + async def set_acls(self, path: str, acls: Sequence[ACL]) -> None: + try: + await to_thread(self.client.retry, self.client.set_acls, path, acls) + except kazoo.exceptions.NoNodeError as e: + raise NoNodeError(path) from e + def transaction(self) -> KazooZooKeeperTransaction: return KazooZooKeeperTransaction(request=self.client.transaction()) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index e5997312..c6988661 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -3,9 +3,11 @@ """ from astacus.common.utils import build_netloc -from astacus.coordinator.plugins.zookeeper import KazooZooKeeperClient +from astacus.coordinator.plugins.zookeeper import KazooZooKeeperClient, ZooKeeperUser from collections.abc import AsyncIterator, Mapping, Sequence +from kazoo.security import make_digest_acl_credential from pathlib import Path +from tests.integration.constants import ZOOKEPER_ADMIN_USER from types import MappingProxyType import asyncio @@ -34,13 +36,19 @@ async def get_command_path(name: str) -> Path | None: return None -def get_zookeeper_command(*, java_path: Path, data_dir: Path, port: int) -> Sequence[str | Path] | None: +def get_zookeeper_command( + *, java_path: Path, data_dir: Path, port: int, admin_user: ZooKeeperUser +) -> Sequence[str | Path] | None: zookeeper_jars = list(Path("/usr/share/zookeeper").glob("*.jar")) if zookeeper_jars: class_paths = [data_dir, *zookeeper_jars] class_path_option = ":".join(str(path) for path in class_paths) zookeeper_path = "org.apache.zookeeper.server.quorum.QuorumPeerMain" - java_options = ["-Dzookeeper.admin.enableServer=false"] + admin_credential_digest = make_digest_acl_credential(admin_user.username, admin_user.password) + java_options = [ + "-Dzookeeper.admin.enableServer=false", + f"-Dzookeeper.DigestAuthenticationProvider.superDigest={admin_user.username}:{admin_credential_digest}", + ] return [java_path, "-cp", class_path_option, *java_options, zookeeper_path, str(port), data_dir] return None @@ -149,7 +157,16 @@ def get_kazoo_host(zookeeper: Service) -> str: @pytest.fixture(name="zookeeper_client") def fixture_zookeeper_client(zookeeper: Service) -> KazooZooKeeperClient: - return KazooZooKeeperClient(hosts=[get_kazoo_host(zookeeper)], timeout=10) + return get_zookeeper_client(zookeeper) + + +def get_zookeeper_client(zookeeper: Service) -> KazooZooKeeperClient: + user = ( + ZooKeeperUser(username=zookeeper.username, password=zookeeper.password) + if zookeeper.username is not None and zookeeper.password is not None + else None + ) + return KazooZooKeeperClient(hosts=[get_kazoo_host(zookeeper)], user=user) @contextlib.asynccontextmanager @@ -170,7 +187,8 @@ async def create_zookeeper(ports: Ports) -> AsyncIterator[Service]: log4j.appender.default.layout.ConversionPattern=[%-5p] %m%n """ ) - command = get_zookeeper_command(java_path=java_path, data_dir=data_dir, port=port) + zookeeper_admin_user = ZOOKEPER_ADMIN_USER + command = get_zookeeper_command(java_path=java_path, data_dir=data_dir, port=port, admin_user=zookeeper_admin_user) if command is None: pytest.skip("zookeeper installation not found") async with contextlib.AsyncExitStack() as stack: @@ -186,7 +204,13 @@ async def create_zookeeper(ports: Ports) -> AsyncIterator[Service]: timeout=20.0, ) ) - yield Service(process=process, port=port, data_dir=data_dir) + yield Service( + process=process, + port=port, + data_dir=data_dir, + username=zookeeper_admin_user.username, + password=zookeeper_admin_user.password, + ) break except FailPatternFoundError: if attempt + 1 == max_attempts: diff --git a/tests/integration/constants.py b/tests/integration/constants.py new file mode 100644 index 00000000..8ed7c5da --- /dev/null +++ b/tests/integration/constants.py @@ -0,0 +1,4 @@ +# Copyright (c) 2024 Aiven Ltd +from astacus.coordinator.plugins.zookeeper import ZooKeeperUser + +ZOOKEPER_ADMIN_USER = ZooKeeperUser("super", "foobar") diff --git a/tests/integration/coordinator/plugins/clickhouse/conftest.py b/tests/integration/coordinator/plugins/clickhouse/conftest.py index 7316eed5..e43ca5da 100644 --- a/tests/integration/coordinator/plugins/clickhouse/conftest.py +++ b/tests/integration/coordinator/plugins/clickhouse/conftest.py @@ -385,7 +385,7 @@ def setting(name: str, value: int | float | str): {data_dir!s} - debug + trace true {tcp_port} diff --git a/tests/integration/coordinator/plugins/clickhouse/test_steps.py b/tests/integration/coordinator/plugins/clickhouse/test_steps.py index 1992f819..7bc7cc85 100644 --- a/tests/integration/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/integration/coordinator/plugins/clickhouse/test_steps.py @@ -3,14 +3,16 @@ """ from .conftest import ClickHouseCommand, create_clickhouse_cluster, get_clickhouse_client, MinioBucket +from .test_plugin import setup_cluster_users from astacus.coordinator.cluster import Cluster from astacus.coordinator.plugins.base import StepsContext +from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table -from astacus.coordinator.plugins.clickhouse.steps import RetrieveDatabasesAndTablesStep +from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep from base64 import b64decode -from collections.abc import Sequence +from collections.abc import AsyncIterator, Sequence from tests.integration.conftest import create_zookeeper, Ports -from typing import cast +from typing import cast, NamedTuple from uuid import UUID import pytest @@ -99,3 +101,77 @@ async def test_retrieve_tables(ports: Ports, clickhouse_command: ClickHouseComma dependencies=[], ), ] + + +class KeeperMapInfo(NamedTuple): + context: StepsContext + clickhouse_client: ClickHouseClient + user_client: ClickHouseClient + + +@pytest.fixture(name="keeper_table_context") +async def fixture_keeper_table_context( + ports: Ports, clickhouse_command: ClickHouseCommand, minio_bucket: MinioBucket +) -> AsyncIterator[KeeperMapInfo]: + async with ( + create_zookeeper(ports) as zookeeper, + create_clickhouse_cluster(zookeeper, minio_bucket, ports, ["s1"], clickhouse_command) as clickhouse_cluster, + ): + clickhouse = clickhouse_cluster.services[0] + admin_client = get_clickhouse_client(clickhouse) + await setup_cluster_users([admin_client]) + for statement in [ + b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')", + b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey", + b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)", + b"CREATE USER bob IDENTIFIED WITH sha256_password BY 'secret'", + b"GRANT INSERT, SELECT, UPDATE, DELETE ON `keeperdata`.`keepertable` TO `bob`", + ]: + await admin_client.execute(statement) + user_client = HttpClickHouseClient( + host=clickhouse.host, + port=clickhouse.port, + username="bob", + password="secret", + timeout=10, + ) + step = RetrieveDatabasesAndTablesStep(clients=[admin_client]) + context = StepsContext() + databases_tables_result = await step.run_step(Cluster(nodes=[]), context=context) + context.set_result(RetrieveDatabasesAndTablesStep, databases_tables_result) + yield KeeperMapInfo(context, admin_client, user_client) + + +async def test_keeper_map_table_select_only_setting_modified(keeper_table_context: KeeperMapInfo) -> None: + steps_context, admin_client, user_client = keeper_table_context + read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False) + # After the read-write step, the user should only be able to select from the table + await read_only_step.run_step(Cluster(nodes=[]), steps_context) + with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + await user_client.execute( + b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)" + ) + with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20") + with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20") + read_only_row_count = cast( + Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") + ) + assert int(read_only_row_count[0][0]) == 3 + # After the read-write step, the user should be able to write, update and delete from the table + read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True) + await read_write_step.run_step(Cluster(nodes=[]), steps_context) + await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(3) FROM numbers(3, 3)") + read_write_row_count = cast( + Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") + ) + assert int(read_write_row_count[0][0]) == 6 + await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20") + current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey") + assert all(int(cast(str, val[0])) == 3 for val in current_values) + await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20") + post_delete_row_count = cast( + Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") + ) + assert int(post_delete_row_count[0][0]) == 0 diff --git a/tests/unit/coordinator/plugins/clickhouse/test_steps.py b/tests/unit/coordinator/plugins/clickhouse/test_steps.py index 3cea6f05..bac66a13 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_steps.py @@ -52,6 +52,7 @@ FreezeUnfreezeTablesStepBase, get_restore_table_query, GetVersionsStep, + KeeperMapTablesReadOnlyStep, ListDatabaseReplicasStep, MoveFrozenPartsStep, PrepareClickHouseManifestStep, @@ -1275,6 +1276,32 @@ async def test_restore_keeper_map_table_data_step() -> None: ] +@pytest.mark.parametrize( + ("only_allow_select", "expected_statement"), + [ + (True, b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING only_allow_select_statement=true"), + (False, b"ALTER TABLE `db-two`.`table-keeper` RESET SETTING only_allow_select_statement"), + ], + ids=["read-only", "read-write"], +) +async def test_keeper_map_table_select_only_setting_modified(only_allow_select: bool, expected_statement: bytes) -> None: + clickhouse_client = mock_clickhouse_client() + context = StepsContext() + sample_tables = SAMPLE_TABLES + [ + Table( + database=b"db-two", + name=b"table-keeper", + uuid=uuid.UUID("00000000-0000-0000-0000-200000000008"), + engine="KeeperMap", + create_query=b"CREATE TABLE db-two.table-keeper ...", + ), + ] + context.set_result(RetrieveDatabasesAndTablesStep, (SAMPLE_DATABASES, sample_tables)) + step = KeeperMapTablesReadOnlyStep(clients=[clickhouse_client], allow_writes=only_allow_select) + await step.run_step(Cluster(nodes=[]), context) + assert clickhouse_client.mock_calls == [mock.call.execute(expected_statement)] + + async def test_attaches_all_mergetree_parts_in_manifest() -> None: client_1 = mock_clickhouse_client() client_2 = mock_clickhouse_client()