Skip to content

Commit

Permalink
ClickHouse: pause KeeperMap insertion momentarily during backup
Browse files Browse the repository at this point in the history
The PR alters replicated users' privileges (through GRANT & REVOKE) during the
backup. This ensures the KeeperMap tables' snapshot is consistent with the
frozen table data.

[DDB-1237]
  • Loading branch information
aris-aiven committed Oct 29, 2024
1 parent 2171f69 commit f48b634
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 3 deletions.
3 changes: 3 additions & 0 deletions astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DeleteDanglingObjectStorageFilesStep,
FreezeTablesStep,
GetVersionsStep,
KeeperMapTablesReadOnlyStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -129,6 +130,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
),
RetrieveDatabasesAndTablesStep(clients=clickhouse_clients),
RetrieveMacrosStep(clients=clickhouse_clients),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False),
RetrieveKeeperMapTableDataStep(
zookeeper_client=zookeeper_client,
keeper_map_path_prefix=self.keeper_map_path_prefix,
Expand All @@ -137,6 +139,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
FreezeTablesStep(
clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout
),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True),
# Then snapshot and backup all frozen table parts
SnapshotStep(
snapshot_groups=disks.get_snapshot_groups(self.freeze_name),
Expand Down
30 changes: 30 additions & 0 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,36 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Us
return user_defined_functions


@dataclasses.dataclass
class KeeperMapTablesReadOnlyStep(Step[None]):
clients: Sequence[ClickHouseClient]
allow_writes: bool

@staticmethod
def get_revoke_statement(table: Table, escaped_user_name: str) -> bytes:
return f"REVOKE INSERT, UPDATE, DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}".encode()

@staticmethod
def get_grant_statement(table: Table, escaped_user_name: str) -> bytes:
return f"GRANT INSERT, UPDATE, DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}".encode()

async def run_step(self, cluster: Cluster, context: StepsContext):
_, tables = context.get_result(RetrieveDatabasesAndTablesStep)
replicated_users_response = await self.clients[0].execute(
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name"
)
replicated_users_names = [b64decode(cast(str, user[0])) for user in replicated_users_response]
keeper_map_table_names = [table for table in tables if table.engine == "KeeperMap"]
privilege_altering_fun = self.get_grant_statement if self.allow_writes else self.get_revoke_statement
statements = [
privilege_altering_fun(table, escape_sql_identifier(user))
for table in keeper_map_table_names
for user in replicated_users_names
]
logger.info("Executing %d statements on KeeperMap tables\n%s", len(statements), "\n".join(map(str, statements)))
await asyncio.gather(*(self.clients[0].execute(statement) for statement in statements))


@dataclasses.dataclass
class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]):
zookeeper_client: ZooKeeperClient
Expand Down
82 changes: 79 additions & 3 deletions tests/integration/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
"""

from .conftest import ClickHouseCommand, create_clickhouse_cluster, get_clickhouse_client, MinioBucket
from .test_plugin import setup_cluster_users
from astacus.coordinator.cluster import Cluster
from astacus.coordinator.plugins.base import StepsContext
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient
from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table
from astacus.coordinator.plugins.clickhouse.steps import RetrieveDatabasesAndTablesStep
from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep
from base64 import b64decode
from collections.abc import Sequence
from collections.abc import AsyncIterator, Sequence
from tests.integration.conftest import create_zookeeper, Ports
from typing import cast
from typing import cast, NamedTuple
from uuid import UUID

import pytest
Expand Down Expand Up @@ -99,3 +101,77 @@ async def test_retrieve_tables(ports: Ports, clickhouse_command: ClickHouseComma
dependencies=[],
),
]


class KeeperMapInfo(NamedTuple):
context: StepsContext
clickhouse_client: ClickHouseClient
user_client: ClickHouseClient


@pytest.fixture(name="keeper_table_context")
async def fixture_keeper_table_context(
ports: Ports, clickhouse_command: ClickHouseCommand, minio_bucket: MinioBucket
) -> AsyncIterator[KeeperMapInfo]:
async with (
create_zookeeper(ports) as zookeeper,
create_clickhouse_cluster(zookeeper, minio_bucket, ports, ["s1"], clickhouse_command) as clickhouse_cluster,
):
clickhouse = clickhouse_cluster.services[0]
admin_client = get_clickhouse_client(clickhouse)
await setup_cluster_users([admin_client])
for statement in [
b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')",
b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey",
b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)",
b"CREATE USER bob IDENTIFIED WITH sha256_password BY 'secret'",
b"GRANT INSERT, SELECT, UPDATE, DELETE ON `keeperdata`.`keepertable` TO `bob`",
]:
await admin_client.execute(statement)
user_client = HttpClickHouseClient(
host=clickhouse.host,
port=clickhouse.port,
username="bob",
password="secret",
timeout=10,
)
step = RetrieveDatabasesAndTablesStep(clients=[admin_client])
context = StepsContext()
databases_tables_result = await step.run_step(Cluster(nodes=[]), context=context)
context.set_result(RetrieveDatabasesAndTablesStep, databases_tables_result)
yield KeeperMapInfo(context, admin_client, user_client)


async def test_keeper_map_table_select_only_setting_modified(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_client = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
# After the read-write step, the user should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(
b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)"
)
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
read_only_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_only_row_count[0][0]) == 3
# After the read-write step, the user should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(3) FROM numbers(3, 3)")
read_write_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_write_row_count[0][0]) == 6
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey")
assert all(int(cast(str, val[0])) == 3 for val in current_values)
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
post_delete_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
41 changes: 41 additions & 0 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
FreezeUnfreezeTablesStepBase,
get_restore_table_query,
GetVersionsStep,
KeeperMapTablesReadOnlyStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -1275,6 +1276,46 @@ async def test_restore_keeper_map_table_data_step() -> None:
]


@pytest.mark.parametrize(
("allow_writes", "expected_statements"),
[
(
False,
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"REVOKE INSERT, UPDATE, DELETE ON `db-two`.`table-keeper` FROM `alice`",
],
),
(
True,
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"GRANT INSERT, UPDATE, DELETE ON `db-two`.`table-keeper` TO `alice`",
],
),
],
ids=["read-only", "read-write"],
)
async def test_keeper_map_table_select_only_setting_modified(allow_writes: bool, expected_statements: list[bytes]) -> None:
clickhouse_client = mock_clickhouse_client()
context = StepsContext()
sample_tables = SAMPLE_TABLES + [
Table(
database=b"db-two",
name=b"table-keeper",
uuid=uuid.UUID("00000000-0000-0000-0000-200000000008"),
engine="KeeperMap",
create_query=b"CREATE TABLE db-two.table-keeper ...",
),
]
context.set_result(RetrieveDatabasesAndTablesStep, (SAMPLE_DATABASES, sample_tables))
step = KeeperMapTablesReadOnlyStep(clients=[clickhouse_client], allow_writes=allow_writes)
clickhouse_client.execute.return_value = [[base64.b64encode(b"alice")]]
await step.run_step(Cluster(nodes=[]), context)
mock_calls = clickhouse_client.mock_calls
assert mock_calls == [mock.call.execute(statement) for statement in expected_statements]


async def test_attaches_all_mergetree_parts_in_manifest() -> None:
client_1 = mock_clickhouse_client()
client_2 = mock_clickhouse_client()
Expand Down

0 comments on commit f48b634

Please sign in to comment.