Skip to content

Commit

Permalink
Pause KeeperMap tables using read-only setting
Browse files Browse the repository at this point in the history
Emitting a "table is read-only" is more precise
and is already handled by Kafka Sink Connector
for ClickHouse.
  • Loading branch information
aris-aiven committed Nov 18, 2024
1 parent 28484bc commit 80b4c8a
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 70 deletions.
5 changes: 3 additions & 2 deletions astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
FreezeTablesStep,
GetVersionsStep,
KeeperMapTablesReadOnlyStep,
KeeperMapTablesReadWriteStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -130,7 +131,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
),
RetrieveDatabasesAndTablesStep(clients=clickhouse_clients),
RetrieveMacrosStep(clients=clickhouse_clients),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients),
RetrieveKeeperMapTableDataStep(
zookeeper_client=zookeeper_client,
keeper_map_path_prefix=self.keeper_map_path_prefix,
Expand All @@ -140,7 +141,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
FreezeTablesStep(
clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout
),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True),
KeeperMapTablesReadWriteStep(clients=clickhouse_clients),
# Then snapshot and backup all frozen table parts
SnapshotStep(
snapshot_groups=disks.get_snapshot_groups(self.freeze_name),
Expand Down
45 changes: 21 additions & 24 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,38 +181,35 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Us


@dataclasses.dataclass
class KeeperMapTablesReadOnlyStep(Step[None]):
class KeeperMapTablesReadabilityStepBase(Step[None]):
clients: Sequence[ClickHouseClient]
allow_writes: bool
_is_read_only: bool = dataclasses.field(init=False)

async def revoke_write_on_table(self, table: Table, user_name: bytes) -> None:
escaped_user_name = escape_sql_identifier(user_name)
revoke_statement = (
f"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}"
)
await self.clients[0].execute(revoke_statement.encode())
def readability_statement(self, escaped_table_identifier: str) -> str:
read_only = str(self._is_read_only).lower()
return f"ALTER TABLE {escaped_table_identifier} MODIFY SETTING read_only={read_only}"

async def grant_write_on_table(self, table: Table, user_name: bytes) -> None:
escaped_user_name = escape_sql_identifier(user_name)
grant_statement = (
f"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}"
)
await self.clients[0].execute(grant_statement.encode())
async def alter_readability(self, escaped_table_identifier: str) -> None:
statement = self.readability_statement(escaped_table_identifier).encode()
await self.clients[0].execute(statement)

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
_, tables = context.get_result(RetrieveDatabasesAndTablesStep)
replicated_users_response = await self.clients[0].execute(
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name"
)
replicated_users_names = [b64decode(cast(str, user[0])) for user in replicated_users_response]
keeper_map_table_names = [table for table in tables if table.engine == "KeeperMap"]
privilege_altering_fun = self.grant_write_on_table if self.allow_writes else self.revoke_write_on_table
privilege_update_tasks = [
privilege_altering_fun(table, user) for table in keeper_map_table_names for user in replicated_users_names
]
privilege_update_tasks = [self.alter_readability(table.escaped_sql_identifier) for table in keeper_map_table_names]
await asyncio.gather(*privilege_update_tasks)


@dataclasses.dataclass
class KeeperMapTablesReadOnlyStep(KeeperMapTablesReadabilityStepBase):
_is_read_only = True


@dataclasses.dataclass
class KeeperMapTablesReadWriteStep(KeeperMapTablesReadabilityStepBase):
_is_read_only = False


@dataclasses.dataclass
class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]):
zookeeper_client: ZooKeeperClient
Expand Down Expand Up @@ -257,7 +254,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke

async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None:
try:
await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context)
await KeeperMapTablesReadOnlyStep(self.clients).run_step(cluster, context)
except ClickHouseClientQueryError:
logger.warning("Unable to restore write ACLs for KeeperMap tables")

Expand Down Expand Up @@ -488,7 +485,7 @@ def operation(self) -> str:

async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None:
try:
await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context)
await KeeperMapTablesReadOnlyStep(clients=self.clients).run_step(cluster, context)
except ClickHouseClientQueryError:
logger.warning("Unable to restore write ACLs for KeeperMap tables")

Expand Down
47 changes: 25 additions & 22 deletions tests/integration/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from astacus.coordinator.plugins.base import StepsContext
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient
from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table
from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep
from astacus.coordinator.plugins.clickhouse.steps import (
KeeperMapTablesReadOnlyStep,
KeeperMapTablesReadWriteStep,
RetrieveDatabasesAndTablesStep,
)
from base64 import b64decode
from collections.abc import AsyncIterator, Sequence
from tests.integration.conftest import create_zookeeper, Ports
Expand Down Expand Up @@ -184,52 +188,51 @@ async def fixture_keeper_table_context(
yield KeeperMapInfo(context, admin_client, [foobar_client, alice_client])


async def get_row_count(client: ClickHouseClient) -> int:
keeper_table_row_count = cast(
Sequence[tuple[int]], await client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
return int(keeper_table_row_count[0][0])


async def check_read_only(user_client: ClickHouseClient) -> None:
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"):
await user_client.execute(
b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)"
)
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"):
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"):
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
read_only_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_only_row_count[0][0]) == 3
read_only_row_count = await get_row_count(user_client)
assert read_only_row_count == 3


async def check_read_write(user_client: ClickHouseClient) -> None:
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(10) FROM numbers(3)")
read_write_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_write_row_count[0][0]) == 3
read_write_row_count = await get_row_count(user_client)
assert read_write_row_count == 3
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey")
assert all(int(cast(str, val[0])) == 3 for val in current_values)
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
post_delete_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
post_delete_row_count = await get_row_count(user_client)
assert post_delete_row_count == 0


async def test_keeper_map_table_read_only_step(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_clients = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
read_only_step = KeeperMapTablesReadOnlyStep([admin_client])
# After the read-only step, users should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
for user_client in user_clients:
await check_read_only(user_client)
# After the read-write step, users should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
read_write_step = KeeperMapTablesReadWriteStep([admin_client])
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
# Clean up table so that each user starts from a clean slate
await admin_client.execute(b"TRUNCATE TABLE `keeperdata`.`keepertable`")
post_delete_row_count = cast(
Sequence[tuple[int]], await admin_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
post_delete_row_count = await get_row_count(admin_client)
assert post_delete_row_count == 0
for user_client in user_clients:
await check_read_write(user_client)
35 changes: 13 additions & 22 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
StepFailedError,
StepsContext,
)
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, Row, StubClickHouseClient
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, StubClickHouseClient
from astacus.coordinator.plugins.clickhouse.config import (
ClickHouseConfiguration,
ClickHouseNode,
Expand Down Expand Up @@ -52,7 +52,9 @@
FreezeUnfreezeTablesStepBase,
get_restore_table_query,
GetVersionsStep,
KeeperMapTablesReadabilityStepBase,
KeeperMapTablesReadOnlyStep,
KeeperMapTablesReadWriteStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -1292,35 +1294,23 @@ async def test_restore_keeper_map_table_data_step() -> None:


@pytest.mark.parametrize(
("allow_writes", "expected_statements"),
("step_class", "expected_statements"),
[
(
False,
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` FROM `alice`",
],
KeeperMapTablesReadOnlyStep,
[b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING read_only=true"],
),
(
True,
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` TO `alice`",
],
KeeperMapTablesReadWriteStep,
[b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING read_only=false"],
),
],
ids=["read-only", "read-write"],
)
async def test_keeper_map_table_select_only_setting_modified(allow_writes: bool, expected_statements: list[bytes]) -> None:
async def test_keeper_map_table_read_only_setting(
step_class: type[KeeperMapTablesReadabilityStepBase], expected_statements: list[bytes]
) -> None:
clickhouse_client = mock_clickhouse_client()

def execute_side_effect(statement: bytes) -> list[Row]:
if statement == b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name":
return [[base64.b64encode(b"alice").decode()]]
return []

clickhouse_client.execute.side_effect = execute_side_effect
context = StepsContext()
sample_tables = SAMPLE_TABLES + [
Table(
database=b"db-two",
Expand All @@ -1330,8 +1320,9 @@ def execute_side_effect(statement: bytes) -> list[Row]:
create_query=b"CREATE TABLE db-two.table-keeper ...",
),
]
context = StepsContext()
context.set_result(RetrieveDatabasesAndTablesStep, (SAMPLE_DATABASES, sample_tables))
step = KeeperMapTablesReadOnlyStep(clients=[clickhouse_client], allow_writes=allow_writes)
step = step_class(clients=[clickhouse_client])
await step.run_step(Cluster(nodes=[]), context)
mock_calls = clickhouse_client.mock_calls
assert mock_calls == [mock.call.execute(statement) for statement in expected_statements]
Expand Down

0 comments on commit 80b4c8a

Please sign in to comment.