Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pause KeeperMap tables using read-only setting #261

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
FreezeTablesStep,
GetVersionsStep,
KeeperMapTablesReadOnlyStep,
KeeperMapTablesReadWriteStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -130,7 +131,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
),
RetrieveDatabasesAndTablesStep(clients=clickhouse_clients),
RetrieveMacrosStep(clients=clickhouse_clients),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients),
RetrieveKeeperMapTableDataStep(
zookeeper_client=zookeeper_client,
keeper_map_path_prefix=self.keeper_map_path_prefix,
Expand All @@ -140,7 +141,7 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
FreezeTablesStep(
clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout
),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True),
KeeperMapTablesReadWriteStep(clients=clickhouse_clients),
# Then snapshot and backup all frozen table parts
SnapshotStep(
snapshot_groups=disks.get_snapshot_groups(self.freeze_name),
Expand Down
45 changes: 21 additions & 24 deletions astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,38 +181,35 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Us


@dataclasses.dataclass
class KeeperMapTablesReadOnlyStep(Step[None]):
class KeeperMapTablesReadabilityStepBase(Step[None]):
clients: Sequence[ClickHouseClient]
allow_writes: bool
_is_read_only: bool = dataclasses.field(init=False)

async def revoke_write_on_table(self, table: Table, user_name: bytes) -> None:
escaped_user_name = escape_sql_identifier(user_name)
revoke_statement = (
f"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}"
)
await self.clients[0].execute(revoke_statement.encode())
def readability_statement(self, escaped_table_identifier: str) -> str:
read_only = str(self._is_read_only).lower()
return f"ALTER TABLE {escaped_table_identifier} MODIFY SETTING read_only={read_only}"

async def grant_write_on_table(self, table: Table, user_name: bytes) -> None:
escaped_user_name = escape_sql_identifier(user_name)
grant_statement = (
f"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}"
)
await self.clients[0].execute(grant_statement.encode())
async def alter_readability(self, escaped_table_identifier: str) -> None:
statement = self.readability_statement(escaped_table_identifier).encode()
await self.clients[0].execute(statement)

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
_, tables = context.get_result(RetrieveDatabasesAndTablesStep)
replicated_users_response = await self.clients[0].execute(
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name"
)
replicated_users_names = [b64decode(cast(str, user[0])) for user in replicated_users_response]
keeper_map_table_names = [table for table in tables if table.engine == "KeeperMap"]
privilege_altering_fun = self.grant_write_on_table if self.allow_writes else self.revoke_write_on_table
privilege_update_tasks = [
privilege_altering_fun(table, user) for table in keeper_map_table_names for user in replicated_users_names
]
privilege_update_tasks = [self.alter_readability(table.escaped_sql_identifier) for table in keeper_map_table_names]
await asyncio.gather(*privilege_update_tasks)


@dataclasses.dataclass
class KeeperMapTablesReadOnlyStep(KeeperMapTablesReadabilityStepBase):
_is_read_only = True


@dataclasses.dataclass
class KeeperMapTablesReadWriteStep(KeeperMapTablesReadabilityStepBase):
_is_read_only = False


@dataclasses.dataclass
class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]):
zookeeper_client: ZooKeeperClient
Expand Down Expand Up @@ -257,7 +254,7 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke

async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None:
try:
await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context)
await KeeperMapTablesReadOnlyStep(self.clients).run_step(cluster, context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for visibility: this is rephrased from allow_writes=True to _is_read_only=False

except ClickHouseClientQueryError:
logger.warning("Unable to restore write ACLs for KeeperMap tables")

Expand Down Expand Up @@ -488,7 +485,7 @@ def operation(self) -> str:

async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None:
try:
await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context)
await KeeperMapTablesReadOnlyStep(clients=self.clients).run_step(cluster, context)
except ClickHouseClientQueryError:
logger.warning("Unable to restore write ACLs for KeeperMap tables")

Expand Down
47 changes: 25 additions & 22 deletions tests/integration/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@
from astacus.coordinator.plugins.base import StepsContext
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient
from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table
from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep
from astacus.coordinator.plugins.clickhouse.steps import (
KeeperMapTablesReadOnlyStep,
KeeperMapTablesReadWriteStep,
RetrieveDatabasesAndTablesStep,
)
from base64 import b64decode
from collections.abc import AsyncIterator, Sequence
from tests.integration.conftest import create_zookeeper, Ports
Expand Down Expand Up @@ -184,52 +188,51 @@ async def fixture_keeper_table_context(
yield KeeperMapInfo(context, admin_client, [foobar_client, alice_client])


async def get_row_count(client: ClickHouseClient) -> int:
keeper_table_row_count = cast(
Sequence[tuple[int]], await client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
return int(keeper_table_row_count[0][0])


async def check_read_only(user_client: ClickHouseClient) -> None:
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"):
await user_client.execute(
b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)"
)
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"):
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
with pytest.raises(ClickHouseClientQueryError, match=".*TABLE_IS_READ_ONLY.*"):
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
read_only_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_only_row_count[0][0]) == 3
read_only_row_count = await get_row_count(user_client)
assert read_only_row_count == 3


async def check_read_write(user_client: ClickHouseClient) -> None:
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(10) FROM numbers(3)")
read_write_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_write_row_count[0][0]) == 3
read_write_row_count = await get_row_count(user_client)
assert read_write_row_count == 3
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey")
assert all(int(cast(str, val[0])) == 3 for val in current_values)
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
post_delete_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
post_delete_row_count = await get_row_count(user_client)
assert post_delete_row_count == 0


async def test_keeper_map_table_read_only_step(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_clients = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
read_only_step = KeeperMapTablesReadOnlyStep([admin_client])
# After the read-only step, users should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
for user_client in user_clients:
await check_read_only(user_client)
# After the read-write step, users should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
read_write_step = KeeperMapTablesReadWriteStep([admin_client])
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
# Clean up table so that each user starts from a clean slate
await admin_client.execute(b"TRUNCATE TABLE `keeperdata`.`keepertable`")
post_delete_row_count = cast(
Sequence[tuple[int]], await admin_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
post_delete_row_count = await get_row_count(admin_client)
assert post_delete_row_count == 0
for user_client in user_clients:
await check_read_write(user_client)
35 changes: 13 additions & 22 deletions tests/unit/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
StepFailedError,
StepsContext,
)
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, Row, StubClickHouseClient
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, StubClickHouseClient
from astacus.coordinator.plugins.clickhouse.config import (
ClickHouseConfiguration,
ClickHouseNode,
Expand Down Expand Up @@ -52,7 +52,9 @@
FreezeUnfreezeTablesStepBase,
get_restore_table_query,
GetVersionsStep,
KeeperMapTablesReadabilityStepBase,
KeeperMapTablesReadOnlyStep,
KeeperMapTablesReadWriteStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -1292,35 +1294,23 @@ async def test_restore_keeper_map_table_data_step() -> None:


@pytest.mark.parametrize(
("allow_writes", "expected_statements"),
("step_class", "expected_statements"),
[
(
False,
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` FROM `alice`",
],
KeeperMapTablesReadOnlyStep,
[b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING read_only=true"],
),
(
True,
[
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name",
b"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON `db-two`.`table-keeper` TO `alice`",
],
KeeperMapTablesReadWriteStep,
[b"ALTER TABLE `db-two`.`table-keeper` MODIFY SETTING read_only=false"],
),
],
ids=["read-only", "read-write"],
)
async def test_keeper_map_table_select_only_setting_modified(allow_writes: bool, expected_statements: list[bytes]) -> None:
async def test_keeper_map_table_read_only_setting(
step_class: type[KeeperMapTablesReadabilityStepBase], expected_statements: list[bytes]
) -> None:
clickhouse_client = mock_clickhouse_client()

def execute_side_effect(statement: bytes) -> list[Row]:
if statement == b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name":
return [[base64.b64encode(b"alice").decode()]]
return []

clickhouse_client.execute.side_effect = execute_side_effect
context = StepsContext()
sample_tables = SAMPLE_TABLES + [
Table(
database=b"db-two",
Expand All @@ -1330,8 +1320,9 @@ def execute_side_effect(statement: bytes) -> list[Row]:
create_query=b"CREATE TABLE db-two.table-keeper ...",
),
]
context = StepsContext()
context.set_result(RetrieveDatabasesAndTablesStep, (SAMPLE_DATABASES, sample_tables))
step = KeeperMapTablesReadOnlyStep(clients=[clickhouse_client], allow_writes=allow_writes)
step = step_class(clients=[clickhouse_client])
await step.run_step(Cluster(nodes=[]), context)
mock_calls = clickhouse_client.mock_calls
assert mock_calls == [mock.call.execute(statement) for statement in expected_statements]
Expand Down
Loading