Skip to content

Commit

Permalink
ClickHouse: pause KeeperMap insertion momentarily during backup
Browse files Browse the repository at this point in the history
The PR alters replicated users' privileges (through GRANT & REVOKE) during the
backup. This ensures the KeeperMap tables' snapshot is consistent with the
frozen table data.

[DDB-1237]
  • Loading branch information
aris-aiven committed Oct 29, 2024
1 parent 2171f69 commit c91dac7
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 10 deletions.
3 changes: 3 additions & 0 deletions astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DeleteDanglingObjectStorageFilesStep,
FreezeTablesStep,
GetVersionsStep,
KeeperMapTablesReadOnlyStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -129,6 +130,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
),
RetrieveDatabasesAndTablesStep(clients=clickhouse_clients),
RetrieveMacrosStep(clients=clickhouse_clients),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False),
RetrieveKeeperMapTableDataStep(
zookeeper_client=zookeeper_client,
keeper_map_path_prefix=self.keeper_map_path_prefix,
Expand All @@ -137,6 +139,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
FreezeTablesStep(
clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout
),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True),
# Then snapshot and backup all frozen table parts
SnapshotStep(
snapshot_groups=disks.get_snapshot_groups(self.freeze_name),
Expand Down
30 changes: 30 additions & 0 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,36 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Us
return user_defined_functions


@dataclasses.dataclass
class KeeperMapTablesReadOnlyStep(Step[None]):
clients: Sequence[ClickHouseClient]
allow_writes: bool

@staticmethod
def get_revoke_statement(table: Table, escaped_user_name: str) -> bytes:
return f"REVOKE INSERT, UPDATE, DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}".encode()

@staticmethod
def get_grant_statement(table: Table, escaped_user_name: str) -> bytes:
return f"GRANT INSERT, UPDATE, DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}".encode()

async def run_step(self, cluster: Cluster, context: StepsContext):
_, tables = context.get_result(RetrieveDatabasesAndTablesStep)
replicated_users_response = await self.clients[0].execute(
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name"
)
replicated_users_names = [b64decode(cast(str, user[0])) for user in replicated_users_response]
keeper_map_table_names = [table for table in tables if table.engine == "KeeperMap"]
privilege_altering_fun = self.get_grant_statement if self.allow_writes else self.get_revoke_statement
statements = [
privilege_altering_fun(table, escape_sql_identifier(user))
for table in keeper_map_table_names
for user in replicated_users_names
]
logger.info("Executing %d statements on KeeperMap tables\n%s", len(statements), "\n".join(map(str, statements)))
await asyncio.gather(*(self.clients[0].execute(statement) for statement in statements))


@dataclasses.dataclass
class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]):
zookeeper_client: ZooKeeperClient
Expand Down
11 changes: 11 additions & 0 deletions astacus/coordinator/plugins/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from kazoo.protocol.states import KazooState
from kazoo.recipe.watchers import ChildrenWatch, DataWatch
from kazoo.retry import KazooRetry
from kazoo.security import ACL
from queue import Empty, Queue

import asyncio
Expand Down Expand Up @@ -122,6 +123,10 @@ async def exists(self, path: str) -> bool:
"""Check if specified node exists."""
raise NotImplementedError

async def set_acls(self, path: str, acls: Sequence[ACL]) -> None:
"""Set ACLs for node."""
raise NotImplementedError

def transaction(self) -> ZooKeeperTransaction:
"""Begin a transaction."""
raise NotImplementedError
Expand Down Expand Up @@ -292,6 +297,12 @@ async def exists(self, path) -> bool:
maybe_znode_stat = await to_thread(self.client.retry, self.client.exists, path)
return maybe_znode_stat is not None

async def set_acls(self, path: str, acls: Sequence[ACL]) -> None:
try:
await to_thread(self.client.retry, self.client.set_acls, path, acls)
except kazoo.exceptions.NoNodeError as e:
raise NoNodeError(path) from e

def transaction(self) -> KazooZooKeeperTransaction:
return KazooZooKeeperTransaction(request=self.client.transaction())

Expand Down
36 changes: 30 additions & 6 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
"""

from astacus.common.utils import build_netloc
from astacus.coordinator.plugins.zookeeper import KazooZooKeeperClient
from astacus.coordinator.plugins.zookeeper import KazooZooKeeperClient, ZooKeeperUser
from collections.abc import AsyncIterator, Mapping, Sequence
from kazoo.security import make_digest_acl_credential
from pathlib import Path
from tests.integration.constants import ZOOKEPER_ADMIN_USER
from types import MappingProxyType

import asyncio
Expand Down Expand Up @@ -34,13 +36,19 @@ async def get_command_path(name: str) -> Path | None:
return None


def get_zookeeper_command(*, java_path: Path, data_dir: Path, port: int) -> Sequence[str | Path] | None:
def get_zookeeper_command(
*, java_path: Path, data_dir: Path, port: int, admin_user: ZooKeeperUser
) -> Sequence[str | Path] | None:
zookeeper_jars = list(Path("/usr/share/zookeeper").glob("*.jar"))
if zookeeper_jars:
class_paths = [data_dir, *zookeeper_jars]
class_path_option = ":".join(str(path) for path in class_paths)
zookeeper_path = "org.apache.zookeeper.server.quorum.QuorumPeerMain"
java_options = ["-Dzookeeper.admin.enableServer=false"]
admin_credential_digest = make_digest_acl_credential(admin_user.username, admin_user.password)
java_options = [
"-Dzookeeper.admin.enableServer=false",
f"-Dzookeeper.DigestAuthenticationProvider.superDigest={admin_user.username}:{admin_credential_digest}",
]
return [java_path, "-cp", class_path_option, *java_options, zookeeper_path, str(port), data_dir]
return None

Expand Down Expand Up @@ -149,7 +157,16 @@ def get_kazoo_host(zookeeper: Service) -> str:

@pytest.fixture(name="zookeeper_client")
def fixture_zookeeper_client(zookeeper: Service) -> KazooZooKeeperClient:
return KazooZooKeeperClient(hosts=[get_kazoo_host(zookeeper)], timeout=10)
return get_zookeeper_client(zookeeper)


def get_zookeeper_client(zookeeper: Service) -> KazooZooKeeperClient:
user = (
ZooKeeperUser(username=zookeeper.username, password=zookeeper.password)
if zookeeper.username is not None and zookeeper.password is not None
else None
)
return KazooZooKeeperClient(hosts=[get_kazoo_host(zookeeper)], user=user)


@contextlib.asynccontextmanager
Expand All @@ -170,7 +187,8 @@ async def create_zookeeper(ports: Ports) -> AsyncIterator[Service]:
log4j.appender.default.layout.ConversionPattern=[%-5p] %m%n
"""
)
command = get_zookeeper_command(java_path=java_path, data_dir=data_dir, port=port)
zookeeper_admin_user = ZOOKEPER_ADMIN_USER
command = get_zookeeper_command(java_path=java_path, data_dir=data_dir, port=port, admin_user=zookeeper_admin_user)
if command is None:
pytest.skip("zookeeper installation not found")
async with contextlib.AsyncExitStack() as stack:
Expand All @@ -186,7 +204,13 @@ async def create_zookeeper(ports: Ports) -> AsyncIterator[Service]:
timeout=20.0,
)
)
yield Service(process=process, port=port, data_dir=data_dir)
yield Service(
process=process,
port=port,
data_dir=data_dir,
username=zookeeper_admin_user.username,
password=zookeeper_admin_user.password,
)
break
except FailPatternFoundError:
if attempt + 1 == max_attempts:
Expand Down
4 changes: 4 additions & 0 deletions tests/integration/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) 2024 Aiven Ltd
from astacus.coordinator.plugins.zookeeper import ZooKeeperUser

ZOOKEPER_ADMIN_USER = ZooKeeperUser("super", "foobar")
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def setting(name: str, value: int | float | str):
<clickhouse>
<path>{data_dir!s}</path>
<logger>
<level>debug</level>
<level>trace</level>
<console>true</console>
</logger>
<tcp_port>{tcp_port}</tcp_port>
Expand Down
82 changes: 79 additions & 3 deletions tests/integration/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
"""

from .conftest import ClickHouseCommand, create_clickhouse_cluster, get_clickhouse_client, MinioBucket
from .test_plugin import setup_cluster_users
from astacus.coordinator.cluster import Cluster
from astacus.coordinator.plugins.base import StepsContext
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient
from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table
from astacus.coordinator.plugins.clickhouse.steps import RetrieveDatabasesAndTablesStep
from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep
from base64 import b64decode
from collections.abc import Sequence
from collections.abc import AsyncIterator, Sequence
from tests.integration.conftest import create_zookeeper, Ports
from typing import cast
from typing import cast, NamedTuple
from uuid import UUID

import pytest
Expand Down Expand Up @@ -99,3 +101,77 @@ async def test_retrieve_tables(ports: Ports, clickhouse_command: ClickHouseComma
dependencies=[],
),
]


class KeeperMapInfo(NamedTuple):
context: StepsContext
clickhouse_client: ClickHouseClient
user_client: ClickHouseClient


@pytest.fixture(name="keeper_table_context")
async def fixture_keeper_table_context(
ports: Ports, clickhouse_command: ClickHouseCommand, minio_bucket: MinioBucket
) -> AsyncIterator[KeeperMapInfo]:
async with (
create_zookeeper(ports) as zookeeper,
create_clickhouse_cluster(zookeeper, minio_bucket, ports, ["s1"], clickhouse_command) as clickhouse_cluster,
):
clickhouse = clickhouse_cluster.services[0]
admin_client = get_clickhouse_client(clickhouse)
await setup_cluster_users([admin_client])
for statement in [
b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')",
b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey",
b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)",
b"CREATE USER bob IDENTIFIED WITH sha256_password BY 'secret'",
b"GRANT INSERT, SELECT, UPDATE, DELETE ON `keeperdata`.`keepertable` TO `bob`",
]:
await admin_client.execute(statement)
user_client = HttpClickHouseClient(
host=clickhouse.host,
port=clickhouse.port,
username="bob",
password="secret",
timeout=10,
)
step = RetrieveDatabasesAndTablesStep(clients=[admin_client])
context = StepsContext()
databases_tables_result = await step.run_step(Cluster(nodes=[]), context=context)
context.set_result(RetrieveDatabasesAndTablesStep, databases_tables_result)
yield KeeperMapInfo(context, admin_client, user_client)


async def test_keeper_map_table_select_only_setting_modified(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_client = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
# After the read-write step, the user should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(
b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)"
)
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
read_only_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_only_row_count[0][0]) == 3
# After the read-write step, the user should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(3) FROM numbers(3, 3)")
read_write_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_write_row_count[0][0]) == 6
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey")
assert all(int(cast(str, val[0])) == 3 for val in current_values)
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
post_delete_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
27 changes: 27 additions & 0 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
FreezeUnfreezeTablesStepBase,
get_restore_table_query,
GetVersionsStep,
KeeperMapTablesReadOnlyStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -1275,6 +1276,32 @@ async def test_restore_keeper_map_table_data_step() -> None:
]


@pytest.mark.parametrize(
("only_allow_select", "expected_statement"),
[
(True, b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING only_allow_select_statement=true"),
(False, b"ALTER TABLE `db-two`.`table-keeper` RESET SETTING only_allow_select_statement"),
],
ids=["read-only", "read-write"],
)
async def test_keeper_map_table_select_only_setting_modified(only_allow_select: bool, expected_statement: bytes) -> None:
clickhouse_client = mock_clickhouse_client()
context = StepsContext()
sample_tables = SAMPLE_TABLES + [
Table(
database=b"db-two",
name=b"table-keeper",
uuid=uuid.UUID("00000000-0000-0000-0000-200000000008"),
engine="KeeperMap",
create_query=b"CREATE TABLE db-two.table-keeper ...",
),
]
context.set_result(RetrieveDatabasesAndTablesStep, (SAMPLE_DATABASES, sample_tables))
step = KeeperMapTablesReadOnlyStep(clients=[clickhouse_client], allow_writes=only_allow_select)
await step.run_step(Cluster(nodes=[]), context)
assert clickhouse_client.mock_calls == [mock.call.execute(expected_statement)]


async def test_attaches_all_mergetree_parts_in_manifest() -> None:
client_1 = mock_clickhouse_client()
client_2 = mock_clickhouse_client()
Expand Down

0 comments on commit c91dac7

Please sign in to comment.