diff --git a/astacus/coordinator/plugins/clickhouse/plugin.py b/astacus/coordinator/plugins/clickhouse/plugin.py index c8f38d64..b1dd4dc4 100644 --- a/astacus/coordinator/plugins/clickhouse/plugin.py +++ b/astacus/coordinator/plugins/clickhouse/plugin.py @@ -20,6 +20,7 @@ FreezeTablesStep, GetVersionsStep, KeeperMapTablesReadOnlyStep, + KeeperMapTablesReadWriteStep, ListDatabaseReplicasStep, MoveFrozenPartsStep, PrepareClickHouseManifestStep, @@ -130,7 +131,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: ), RetrieveDatabasesAndTablesStep(clients=clickhouse_clients), RetrieveMacrosStep(clients=clickhouse_clients), - KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False), + KeeperMapTablesReadOnlyStep(clients=clickhouse_clients), RetrieveKeeperMapTableDataStep( zookeeper_client=zookeeper_client, keeper_map_path_prefix=self.keeper_map_path_prefix, @@ -140,7 +141,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: FreezeTablesStep( clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout ), - KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True), + KeeperMapTablesReadWriteStep(clients=clickhouse_clients), # Then snapshot and backup all frozen table parts SnapshotStep( snapshot_groups=disks.get_snapshot_groups(self.freeze_name), diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index a789505a..03a80134 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -181,38 +181,35 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Us @dataclasses.dataclass -class KeeperMapTablesReadOnlyStep(Step[None]): +class KeeperMapTablesReadabilityStepBase(Step[None]): clients: Sequence[ClickHouseClient] - allow_writes: bool - async def revoke_write_on_table(self, table: Table, user_name: bytes) -> None: - escaped_user_name = escape_sql_identifier(user_name) - revoke_statement = ( - f"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}" - ) - await self.clients[0].execute(revoke_statement.encode()) + def readability_statement(self, escaped_table_identifier: str) -> str: + raise NotImplementedError - async def grant_write_on_table(self, table: Table, user_name: bytes) -> None: - escaped_user_name = escape_sql_identifier(user_name) - grant_statement = ( - f"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}" - ) - await self.clients[0].execute(grant_statement.encode()) + async def alter_readability(self, escaped_table_identifier: str) -> None: + statement = self.readability_statement(escaped_table_identifier).encode() + await self.clients[0].execute(statement) async def run_step(self, cluster: Cluster, context: StepsContext) -> None: _, tables = context.get_result(RetrieveDatabasesAndTablesStep) - replicated_users_response = await self.clients[0].execute( - b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name" - ) - replicated_users_names = [b64decode(cast(str, user[0])) for user in replicated_users_response] keeper_map_table_names = [table for table in tables if table.engine == "KeeperMap"] - privilege_altering_fun = self.grant_write_on_table if self.allow_writes else self.revoke_write_on_table - privilege_update_tasks = [ - privilege_altering_fun(table, user) for table in keeper_map_table_names for user in replicated_users_names - ] + privilege_update_tasks = [self.alter_readability(table.escaped_sql_identifier) for table in keeper_map_table_names] await asyncio.gather(*privilege_update_tasks) +@dataclasses.dataclass +class KeeperMapTablesReadOnlyStep(KeeperMapTablesReadabilityStepBase): + def readability_statement(self, escaped_table_identifier: str) -> str: + return f"ALTER TABLE {escaped_table_identifier} MODIFY SETTING read_only=true" + + +@dataclasses.dataclass +class KeeperMapTablesReadWriteStep(KeeperMapTablesReadabilityStepBase): + def readability_statement(self, escaped_table_identifier: str) -> str: + return f"ALTER TABLE {escaped_table_identifier} RESET SETTING read_only" + + @dataclasses.dataclass class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]): zookeeper_client: ZooKeeperClient @@ -257,7 +254,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None: try: - await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context) + await KeeperMapTablesReadOnlyStep(self.clients).run_step(cluster, context) except ClickHouseClientQueryError: logger.warning("Unable to restore write ACLs for KeeperMap tables") @@ -488,7 +485,7 @@ def operation(self) -> str: async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None: try: - await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context) + await KeeperMapTablesReadOnlyStep(clients=self.clients).run_step(cluster, context) except ClickHouseClientQueryError: logger.warning("Unable to restore write ACLs for KeeperMap tables") diff --git a/tests/integration/coordinator/plugins/clickhouse/test_steps.py b/tests/integration/coordinator/plugins/clickhouse/test_steps.py index 772e43ca..1ecac066 100644 --- a/tests/integration/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/integration/coordinator/plugins/clickhouse/test_steps.py @@ -7,7 +7,11 @@ from astacus.coordinator.plugins.base import StepsContext from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table -from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep +from astacus.coordinator.plugins.clickhouse.steps import ( + KeeperMapTablesReadOnlyStep, + KeeperMapTablesReadWriteStep, + RetrieveDatabasesAndTablesStep, +) from base64 import b64decode from collections.abc import AsyncIterator, Sequence from tests.integration.conftest import create_zookeeper, Ports @@ -185,13 +189,13 @@ async def fixture_keeper_table_context( async def check_read_only(user_client: ClickHouseClient) -> None: - with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"): await user_client.execute( b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)" ) - with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"): await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20") - with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"): await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20") read_only_row_count = cast( Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") @@ -217,13 +221,13 @@ async def check_read_write(user_client: ClickHouseClient) -> None: async def test_keeper_map_table_read_only_step(keeper_table_context: KeeperMapInfo) -> None: steps_context, admin_client, user_clients = keeper_table_context - read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False) + read_only_step = KeeperMapTablesReadOnlyStep([admin_client]) # After the read-only step, users should only be able to select from the table await read_only_step.run_step(Cluster(nodes=[]), steps_context) for user_client in user_clients: await check_read_only(user_client) # After the read-write step, users should be able to write, update and delete from the table - read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True) + read_write_step = KeeperMapTablesReadWriteStep([admin_client]) await read_write_step.run_step(Cluster(nodes=[]), steps_context) # Clean up table so that each user starts from a clean slate await admin_client.execute(b"TRUNCATE TABLE `keeperdata`.`keepertable`") diff --git a/tests/unit/coordinator/plugins/clickhouse/test_steps.py b/tests/unit/coordinator/plugins/clickhouse/test_steps.py index 541d2f02..975fe309 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_steps.py @@ -15,7 +15,7 @@ StepFailedError, StepsContext, ) -from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, Row, StubClickHouseClient +from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, StubClickHouseClient from astacus.coordinator.plugins.clickhouse.config import ( ClickHouseConfiguration, ClickHouseNode, @@ -52,7 +52,9 @@ FreezeUnfreezeTablesStepBase, get_restore_table_query, GetVersionsStep, + KeeperMapTablesReadabilityStepBase, KeeperMapTablesReadOnlyStep, + KeeperMapTablesReadWriteStep, ListDatabaseReplicasStep, MoveFrozenPartsStep, PrepareClickHouseManifestStep, @@ -1292,35 +1294,23 @@ async def test_restore_keeper_map_table_data_step() -> None: @pytest.mark.parametrize( - ("allow_writes", "expected_statements"), + ("step_class", "expected_statements"), [ ( - False, - [ - b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name", - b"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` FROM `alice`", - ], + KeeperMapTablesReadOnlyStep, + [b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING read_only=true"], ), ( - True, - [ - b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name", - b"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` TO `alice`", - ], + KeeperMapTablesReadWriteStep, + [b"ALTER TABLE `db-two`.`table-keeper` RESET SETTING read_only"], ), ], ids=["read-only", "read-write"], ) -async def test_keeper_map_table_select_only_setting_modified(allow_writes: bool, expected_statements: list[bytes]) -> None: +async def test_keeper_map_table_read_only_setting( + step_class: type[KeeperMapTablesReadabilityStepBase], expected_statements: list[bytes] +) -> None: clickhouse_client = mock_clickhouse_client() - - def execute_side_effect(statement: bytes) -> list[Row]: - if statement == b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name": - return [[base64.b64encode(b"alice").decode()]] - return [] - - clickhouse_client.execute.side_effect = execute_side_effect - context = StepsContext() sample_tables = SAMPLE_TABLES + [ Table( database=b"db-two", @@ -1330,8 +1320,9 @@ def execute_side_effect(statement: bytes) -> list[Row]: create_query=b"CREATE TABLE db-two.table-keeper ...", ), ] + context = StepsContext() context.set_result(RetrieveDatabasesAndTablesStep, (SAMPLE_DATABASES, sample_tables)) - step = KeeperMapTablesReadOnlyStep(clients=[clickhouse_client], allow_writes=allow_writes) + step = step_class(clients=[clickhouse_client]) await step.run_step(Cluster(nodes=[]), context) mock_calls = clickhouse_client.mock_calls assert mock_calls == [mock.call.execute(statement) for statement in expected_statements]