diff --git a/astacus/coordinator/plugins/clickhouse/plugin.py b/astacus/coordinator/plugins/clickhouse/plugin.py index c8f38d64..b1dd4dc4 100644 --- a/astacus/coordinator/plugins/clickhouse/plugin.py +++ b/astacus/coordinator/plugins/clickhouse/plugin.py @@ -20,6 +20,7 @@ FreezeTablesStep, GetVersionsStep, KeeperMapTablesReadOnlyStep, + KeeperMapTablesReadWriteStep, ListDatabaseReplicasStep, MoveFrozenPartsStep, PrepareClickHouseManifestStep, @@ -130,7 +131,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: ), RetrieveDatabasesAndTablesStep(clients=clickhouse_clients), RetrieveMacrosStep(clients=clickhouse_clients), - KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False), + KeeperMapTablesReadOnlyStep(clients=clickhouse_clients), RetrieveKeeperMapTableDataStep( zookeeper_client=zookeeper_client, keeper_map_path_prefix=self.keeper_map_path_prefix, @@ -140,7 +141,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: FreezeTablesStep( clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout ), - KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True), + KeeperMapTablesReadWriteStep(clients=clickhouse_clients), # Then snapshot and backup all frozen table parts SnapshotStep( snapshot_groups=disks.get_snapshot_groups(self.freeze_name), diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index a789505a..e7c0f921 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -181,38 +181,35 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Us @dataclasses.dataclass -class KeeperMapTablesReadOnlyStep(Step[None]): +class KeeperMapTablesReadabilityStepBase(Step[None]): clients: Sequence[ClickHouseClient] - allow_writes: bool + _is_read_only: bool = dataclasses.field(init=False) - async def revoke_write_on_table(self, table: Table, user_name: bytes) -> None: - escaped_user_name = escape_sql_identifier(user_name) - revoke_statement = ( - f"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}" - ) - await self.clients[0].execute(revoke_statement.encode()) + def readability_statement(self, escaped_table_identifier: str) -> str: + read_only = str(self._is_read_only).lower() + return f"ALTER TABLE {escaped_table_identifier} MODIFY SETTING read_only={read_only}" - async def grant_write_on_table(self, table: Table, user_name: bytes) -> None: - escaped_user_name = escape_sql_identifier(user_name) - grant_statement = ( - f"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}" - ) - await self.clients[0].execute(grant_statement.encode()) + async def alter_readability(self, escaped_table_identifier: str) -> None: + statement = self.readability_statement(escaped_table_identifier).encode() + await self.clients[0].execute(statement) async def run_step(self, cluster: Cluster, context: StepsContext) -> None: _, tables = context.get_result(RetrieveDatabasesAndTablesStep) - replicated_users_response = await self.clients[0].execute( - b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name" - ) - replicated_users_names = [b64decode(cast(str, user[0])) for user in replicated_users_response] keeper_map_table_names = [table for table in tables if table.engine == "KeeperMap"] - privilege_altering_fun = self.grant_write_on_table if self.allow_writes else self.revoke_write_on_table - privilege_update_tasks = [ - privilege_altering_fun(table, user) for table in keeper_map_table_names for user in replicated_users_names - ] + privilege_update_tasks = [self.alter_readability(table.escaped_sql_identifier) for table in keeper_map_table_names] await asyncio.gather(*privilege_update_tasks) +@dataclasses.dataclass +class KeeperMapTablesReadOnlyStep(KeeperMapTablesReadabilityStepBase): + _is_read_only = True + + +@dataclasses.dataclass +class KeeperMapTablesReadWriteStep(KeeperMapTablesReadabilityStepBase): + _is_read_only = False + + @dataclasses.dataclass class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]): zookeeper_client: ZooKeeperClient @@ -257,7 +254,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None: try: - await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context) + await KeeperMapTablesReadOnlyStep(self.clients).run_step(cluster, context) except ClickHouseClientQueryError: logger.warning("Unable to restore write ACLs for KeeperMap tables") @@ -488,7 +485,7 @@ def operation(self) -> str: async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None: try: - await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context) + await KeeperMapTablesReadOnlyStep(clients=self.clients).run_step(cluster, context) except ClickHouseClientQueryError: logger.warning("Unable to restore write ACLs for KeeperMap tables") diff --git a/tests/integration/coordinator/plugins/clickhouse/test_steps.py b/tests/integration/coordinator/plugins/clickhouse/test_steps.py index 772e43ca..9c7d0d35 100644 --- a/tests/integration/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/integration/coordinator/plugins/clickhouse/test_steps.py @@ -7,7 +7,11 @@ from astacus.coordinator.plugins.base import StepsContext from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table -from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep +from astacus.coordinator.plugins.clickhouse.steps import ( + KeeperMapTablesReadOnlyStep, + KeeperMapTablesReadWriteStep, + RetrieveDatabasesAndTablesStep, +) from base64 import b64decode from collections.abc import AsyncIterator, Sequence from tests.integration.conftest import create_zookeeper, Ports @@ -184,52 +188,51 @@ async def fixture_keeper_table_context( yield KeeperMapInfo(context, admin_client, [foobar_client, alice_client]) +async def get_row_count(client: ClickHouseClient) -> int: + keeper_table_row_count = cast( + Sequence[tuple[int]], await client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") + ) + return int(keeper_table_row_count[0][0]) + + async def check_read_only(user_client: ClickHouseClient) -> None: - with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"): await user_client.execute( b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)" ) - with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"): await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20") - with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): + with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"): await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20") - read_only_row_count = cast( - Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") - ) - assert int(read_only_row_count[0][0]) == 3 + read_only_row_count = await get_row_count(user_client) + assert read_only_row_count == 3 async def check_read_write(user_client: ClickHouseClient) -> None: await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(10) FROM numbers(3)") - read_write_row_count = cast( - Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") - ) - assert int(read_write_row_count[0][0]) == 3 + read_write_row_count = await get_row_count(user_client) + assert read_write_row_count == 3 await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20") current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey") assert all(int(cast(str, val[0])) == 3 for val in current_values) await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20") - post_delete_row_count = cast( - Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") - ) - assert int(post_delete_row_count[0][0]) == 0 + post_delete_row_count = await get_row_count(user_client) + assert post_delete_row_count == 0 async def test_keeper_map_table_read_only_step(keeper_table_context: KeeperMapInfo) -> None: steps_context, admin_client, user_clients = keeper_table_context - read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False) + read_only_step = KeeperMapTablesReadOnlyStep([admin_client]) # After the read-only step, users should only be able to select from the table await read_only_step.run_step(Cluster(nodes=[]), steps_context) for user_client in user_clients: await check_read_only(user_client) # After the read-write step, users should be able to write, update and delete from the table - read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True) + read_write_step = KeeperMapTablesReadWriteStep([admin_client]) await read_write_step.run_step(Cluster(nodes=[]), steps_context) # Clean up table so that each user starts from a clean slate await admin_client.execute(b"TRUNCATE TABLE `keeperdata`.`keepertable`") - post_delete_row_count = cast( - Sequence[tuple[int]], await admin_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") - ) - assert int(post_delete_row_count[0][0]) == 0 + post_delete_row_count = await get_row_count(admin_client) + assert post_delete_row_count == 0 for user_client in user_clients: await check_read_write(user_client) diff --git a/tests/unit/coordinator/plugins/clickhouse/test_steps.py b/tests/unit/coordinator/plugins/clickhouse/test_steps.py index 541d2f02..a081be0f 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_steps.py @@ -15,7 +15,7 @@ StepFailedError, StepsContext, ) -from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, Row, StubClickHouseClient +from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, StubClickHouseClient from astacus.coordinator.plugins.clickhouse.config import ( ClickHouseConfiguration, ClickHouseNode, @@ -52,7 +52,9 @@ FreezeUnfreezeTablesStepBase, get_restore_table_query, GetVersionsStep, + KeeperMapTablesReadabilityStepBase, KeeperMapTablesReadOnlyStep, + KeeperMapTablesReadWriteStep, ListDatabaseReplicasStep, MoveFrozenPartsStep, PrepareClickHouseManifestStep, @@ -1292,35 +1294,23 @@ async def test_restore_keeper_map_table_data_step() -> None: @pytest.mark.parametrize( - ("allow_writes", "expected_statements"), + ("step_class", "expected_statements"), [ ( - False, - [ - b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name", - b"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` FROM `alice`", - ], + KeeperMapTablesReadOnlyStep, + [b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING read_only=true"], ), ( - True, - [ - b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name", - b"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` TO `alice`", - ], + KeeperMapTablesReadWriteStep, + [b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING read_only=false"], ), ], ids=["read-only", "read-write"], ) -async def test_keeper_map_table_select_only_setting_modified(allow_writes: bool, expected_statements: list[bytes]) -> None: +async def test_keeper_map_table_read_only_setting( + step_class: type[KeeperMapTablesReadabilityStepBase], expected_statements: list[bytes] +) -> None: clickhouse_client = mock_clickhouse_client() - - def execute_side_effect(statement: bytes) -> list[Row]: - if statement == b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name": - return [[base64.b64encode(b"alice").decode()]] - return [] - - clickhouse_client.execute.side_effect = execute_side_effect - context = StepsContext() sample_tables = SAMPLE_TABLES + [ Table( database=b"db-two", @@ -1330,8 +1320,9 @@ def execute_side_effect(statement: bytes) -> list[Row]: create_query=b"CREATE TABLE db-two.table-keeper ...", ), ] + context = StepsContext() context.set_result(RetrieveDatabasesAndTablesStep, (SAMPLE_DATABASES, sample_tables)) - step = KeeperMapTablesReadOnlyStep(clients=[clickhouse_client], allow_writes=allow_writes) + step = step_class(clients=[clickhouse_client]) await step.run_step(Cluster(nodes=[]), context) mock_calls = clickhouse_client.mock_calls assert mock_calls == [mock.call.execute(statement) for statement in expected_statements]