Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClickHouse: pause KeeperMap insertion momentarily during backup #252

Merged
merged 3 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions astacus/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ async def try_run(self, cluster: Cluster, context: StepsContext) -> bool:
with self._progress_handler(cluster, step):
try:
r = await step.run_step(cluster, context)
except (StepFailedError, WaitResultError) as e:
logger.info("Step %s failed: %s", step, str(e))
except (StepFailedError, WaitResultError) as exc:
logger.info("Step %s failed: %s", step, str(exc))
await step.handle_step_failure(cluster, context)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I was thinking maybe more using a stack like

failure_functions = []
for i, step in enumerate(self.steps, 1):
    failure_functions.append(step.handle_step_failure)
    ...
    try:
        ...
    except ...:
        for func in reversed(failure_functions):
            func(cluster, context)

But this is probably good enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe the except needs to be more broad?

Copy link
Contributor Author

@aris-aiven aris-aiven Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the code, if I understand correctly there are two scenarios:

  1. If the failure is Transient then the step will be retried, so there's nothing to put on the stack yet.
  2. If the failure is permanent (astacus.common.exceptions.PermanentException subclass) then there won't be a next step. The exception clause in question sets the step to failed, and higher up the caller stack the whole Op is aborted (in this case the BackupOp).

Which case do you think could result in multiple steps needing to be unwound?

maybe the except needs to be more broad?

Looking at the usages for other PermanentException subclasses they're either raised during the restore steps or by the Rohmu storage subclasses (which would be caught when the server is being started/configured I think). Might be worth broadening StepFailedError to PermanentException..

return False
context.set_result(step.__class__, r)
return True
Expand Down
10 changes: 10 additions & 0 deletions astacus/coordinator/plugins/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,24 @@ class Step(Generic[StepResult_co]):
async def run_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co:
raise NotImplementedError

async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None:
# This method should not raise exceptions
return None


class SyncStep(Step[StepResult_co]):
async def run_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co:
return await run_in_threadpool(self.run_sync_step, cluster, context)

async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None:
await run_in_threadpool(self.handle_step_failure_sync, cluster, context)

def run_sync_step(self, cluster: Cluster, context: StepsContext) -> StepResult_co:
raise NotImplementedError

def handle_step_failure_sync(self, cluster: Cluster, context: StepsContext) -> None:
return None


class StepFailedError(exceptions.PermanentException):
pass
Expand Down
4 changes: 4 additions & 0 deletions astacus/coordinator/plugins/clickhouse/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
DeleteDanglingObjectStorageFilesStep,
FreezeTablesStep,
GetVersionsStep,
KeeperMapTablesReadOnlyStep,
ListDatabaseReplicasStep,
MoveFrozenPartsStep,
PrepareClickHouseManifestStep,
Expand Down Expand Up @@ -129,14 +130,17 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]:
),
RetrieveDatabasesAndTablesStep(clients=clickhouse_clients),
RetrieveMacrosStep(clients=clickhouse_clients),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False),
RetrieveKeeperMapTableDataStep(
zookeeper_client=zookeeper_client,
keeper_map_path_prefix=self.keeper_map_path_prefix,
clients=clickhouse_clients,
),
# Then freeze all tables
FreezeTablesStep(
clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout
),
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True),
Copy link
Contributor

@joelynch joelynch Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we need to have stronger guarantees that this will run in the case that there is a failure in RetrieveKeeperMapTableDataStep or FreezeTablesStep. Maybe each Step could optionally add a function onto a stack and in the case of a failure, all the functions are popped from the stack and run.

Or you could have a step that wraps other steps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only exception the KeeperMapTablesReadOnlyStep should be emitting is ClickHouseClientQueryError, so the handle_step_failure should be noexcept.

# Then snapshot and backup all frozen table parts
SnapshotStep(
snapshot_groups=disks.get_snapshot_groups(self.freeze_name),
Expand Down
79 changes: 78 additions & 1 deletion astacus/coordinator/plugins/clickhouse/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from astacus.coordinator.plugins.zookeeper import ChangeWatch, NoNodeError, TransactionError, ZooKeeperClient
from base64 import b64decode
from collections.abc import Awaitable, Callable, Iterable, Iterator, Mapping, Sequence
from kazoo.exceptions import ZookeeperError
from typing import Any, cast, TypeVar

import asyncio
Expand Down Expand Up @@ -179,10 +180,70 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Us
return user_defined_functions


@dataclasses.dataclass
class KeeperMapTablesReadOnlyStep(Step[None]):
clients: Sequence[ClickHouseClient]
allow_writes: bool

async def revoke_write_on_table(self, table: Table, user_name: bytes) -> None:
escaped_user_name = escape_sql_identifier(user_name)
revoke_statement = (
f"REVOKE INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} FROM {escaped_user_name}"
)
await asyncio.gather(*(client.execute(revoke_statement.encode()) for client in self.clients))
await self.wait_for_access_type_grant(user_name=user_name, table=table, expected_count=0)

async def grant_write_on_table(self, table: Table, user_name: bytes) -> None:
escaped_user_name = escape_sql_identifier(user_name)
grant_statement = (
f"GRANT INSERT, ALTER UPDATE, ALTER DELETE ON {table.escaped_sql_identifier} TO {escaped_user_name}"
)
await asyncio.gather(*(client.execute(grant_statement.encode()) for client in self.clients))
await self.wait_for_access_type_grant(user_name=user_name, table=table, expected_count=3)

async def wait_for_access_type_grant(self, *, table: Table, user_name: bytes, expected_count: int) -> None:
escaped_user_name = escape_sql_string(user_name)
escaped_database = escape_sql_string(table.database)
escaped_table = escape_sql_string(table.name)

async def check_function_count(client: ClickHouseClient) -> bool:
statement = (
f"SELECT count() FROM grants "
f"WHERE user_name={escaped_user_name} "
f"AND database={escaped_database} "
f"AND table={escaped_table} "
f"AND access_type IN ('INSERT', 'ALTER UPDATE', 'ALTER DELETE')"
)
num_grants_response = await client.execute(statement.encode())
num_grants = int(cast(str, num_grants_response[0][0]))
return num_grants == expected_count

await wait_for_condition_on_every_node(
clients=self.clients,
condition=check_function_count,
description="access grants changes to be enforced",
timeout_seconds=60,
)

async def run_step(self, cluster: Cluster, context: StepsContext) -> None:
_, tables = context.get_result(RetrieveDatabasesAndTablesStep)
replicated_users_response = await self.clients[0].execute(
b"SELECT base64Encode(name) FROM system.users WHERE storage = 'replicated' ORDER BY name"
)
replicated_users_names = [b64decode(cast(str, user[0])) for user in replicated_users_response]
keeper_map_table_names = [table for table in tables if table.engine == "KeeperMap"]
privilege_altering_fun = self.grant_write_on_table if self.allow_writes else self.revoke_write_on_table
privilege_update_tasks = [
privilege_altering_fun(table, user) for table in keeper_map_table_names for user in replicated_users_names
]
await asyncio.gather(*privilege_update_tasks)


@dataclasses.dataclass
class RetrieveKeeperMapTableDataStep(Step[Sequence[KeeperMapTable]]):
zookeeper_client: ZooKeeperClient
keeper_map_path_prefix: str | None
clients: Sequence[ClickHouseClient]

async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[KeeperMapTable]:
if self.keeper_map_path_prefix is None:
Expand All @@ -195,6 +256,8 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke
except NoNodeError:
# The path doesn't exist, no keeper map tables to retrieve
return []
except ZookeeperError as e:
raise StepFailedError("Failed to retrieve KeeperMap tables") from e

tables = []
for child in children:
Expand All @@ -203,8 +266,10 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke
try:
data = await connection.get_children_with_data(data_path)
except NoNodeError:
logger.info("ZNode %s is missing, table was dropped. Skipping", data_path)
logger.info("ZNode %s is missing, table was dropped. Skipping", data_path)
continue
except ZookeeperError as e:
raise StepFailedError("Failed to retrieve table data") from e

tables.append(
KeeperMapTable(
Expand All @@ -216,6 +281,12 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke
raise TransientException("Concurrent table addition / deletion during KeeperMap backup")
return tables

async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None:
try:
await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context)
except ClickHouseClientQueryError:
logger.warning("Unable to restore write ACLs for KeeperMap tables")


@dataclasses.dataclass
class RetrieveDatabasesAndTablesStep(Step[DatabasesAndTables]):
Expand Down Expand Up @@ -441,6 +512,12 @@ class FreezeTablesStep(FreezeUnfreezeTablesStepBase):
def operation(self) -> str:
return "FREEZE"

async def handle_step_failure(self, cluster: Cluster, context: StepsContext) -> None:
try:
await KeeperMapTablesReadOnlyStep(clients=self.clients, allow_writes=True).run_step(cluster, context)
except ClickHouseClientQueryError:
logger.warning("Unable to restore write ACLs for KeeperMap tables")


@dataclasses.dataclass
class UnfreezeTablesStep(FreezeUnfreezeTablesStepBase):
Expand Down
7 changes: 5 additions & 2 deletions astacus/coordinator/plugins/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from kazoo.recipe.watchers import ChildrenWatch, DataWatch
from kazoo.retry import KazooRetry
from queue import Empty, Queue
from typing import TypeAlias

import asyncio
import contextlib
Expand All @@ -24,6 +25,7 @@


Watcher = Callable[[WatchedEvent], None]
FaultInjection: TypeAlias = Callable[[], None]


class ZooKeeperTransaction:
Expand Down Expand Up @@ -67,8 +69,9 @@ async def get_children(self, path: str, watch: Watcher | None = None) -> Sequenc
async def get_children_with_data(
self,
path: str,
get_data_fault: Callable[[], None] = lambda: None,
get_children_fault: Callable[[], None] = lambda: None,
*,
get_data_fault: FaultInjection = lambda: None,
get_children_fault: FaultInjection = lambda: None,
) -> dict[str, bytes]:
"""Returns a dictionary of all children of the given `path` with their data.

Expand Down
82 changes: 79 additions & 3 deletions tests/integration/coordinator/plugins/clickhouse/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@
"""

from .conftest import ClickHouseCommand, create_clickhouse_cluster, get_clickhouse_client, MinioBucket
from .test_plugin import setup_cluster_users
from astacus.coordinator.cluster import Cluster
from astacus.coordinator.plugins.base import StepsContext
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient
from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table
from astacus.coordinator.plugins.clickhouse.steps import RetrieveDatabasesAndTablesStep
from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep
from base64 import b64decode
from collections.abc import Sequence
from collections.abc import AsyncIterator, Sequence
from tests.integration.conftest import create_zookeeper, Ports
from typing import cast
from typing import cast, NamedTuple
from uuid import UUID

import pytest
Expand Down Expand Up @@ -99,3 +101,77 @@ async def test_retrieve_tables(ports: Ports, clickhouse_command: ClickHouseComma
dependencies=[],
),
]


class KeeperMapInfo(NamedTuple):
context: StepsContext
clickhouse_client: ClickHouseClient
user_client: ClickHouseClient


@pytest.fixture(name="keeper_table_context")
async def fixture_keeper_table_context(
ports: Ports, clickhouse_command: ClickHouseCommand, minio_bucket: MinioBucket
) -> AsyncIterator[KeeperMapInfo]:
async with (
create_zookeeper(ports) as zookeeper,
create_clickhouse_cluster(zookeeper, minio_bucket, ports, ["s1"], clickhouse_command) as clickhouse_cluster,
):
clickhouse = clickhouse_cluster.services[0]
admin_client = get_clickhouse_client(clickhouse)
await setup_cluster_users([admin_client])
for statement in [
b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')",
b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey",
b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)",
b"CREATE USER bob IDENTIFIED WITH sha256_password BY 'secret'",
b"GRANT INSERT, SELECT, UPDATE, DELETE ON `keeperdata`.`keepertable` TO `bob`",
]:
await admin_client.execute(statement)
user_client = HttpClickHouseClient(
host=clickhouse.host,
port=clickhouse.port,
username="bob",
password="secret",
timeout=10,
)
step = RetrieveDatabasesAndTablesStep(clients=[admin_client])
context = StepsContext()
databases_tables_result = await step.run_step(Cluster(nodes=[]), context=context)
context.set_result(RetrieveDatabasesAndTablesStep, databases_tables_result)
yield KeeperMapInfo(context, admin_client, user_client)


async def test_keeper_map_table_select_only_setting_modified(keeper_table_context: KeeperMapInfo) -> None:
steps_context, admin_client, user_client = keeper_table_context
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False)
# After the read-only step, the user should only be able to select from the table
await read_only_step.run_step(Cluster(nodes=[]), steps_context)
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(
b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)"
)
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
Copy link
Contributor

@Khatskevich Khatskevich Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, we are basically do it for the "ClickHouse connector for Kafka". I have not read it's code, but I suspect some new problems in the normal operation.
What does exactly once mean: every part is processed, every part is not processed 2 times.
Before this commit if there are not backup restore and errors during the operation it was exactly once.
Now, during backup some part will not be able to say that the work is done.
How exactly will it behave?
Because it theoretically could

  1. fail instantly and someone else repeats the whole processing
  2. be restarted and the processing is done again

If so, it is no better than it was, because previously it was not happening during normal operation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sink connector pauses writes to ClickHouse for these exceptions: https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/src/main/java/com/clickhouse/kafka/connect/util/Utils.java#L68

If the exception is retriable, the connector won't commit the offsets to the KeeperMap table, and won't try to write that part to ClickHouse until later. The design docs describes the state machine transitions: https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/docs/DESIGN.md

Copy link
Contributor Author

@aris-aiven aris-aiven Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Critically, the raised ACCESS_DENIED corresponds to exception code 497 which isn't included in the retriable exceptions list. So we need to change the approach a bit.

await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"):
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
read_only_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_only_row_count[0][0]) == 3
# After the read-write step, the user should be able to write, update and delete from the table
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True)
await read_write_step.run_step(Cluster(nodes=[]), steps_context)
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(3) FROM numbers(3, 3)")
read_write_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(read_write_row_count[0][0]) == 6
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20")
current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey")
assert all(int(cast(str, val[0])) == 3 for val in current_values)
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20")
post_delete_row_count = cast(
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`")
)
assert int(post_delete_row_count[0][0]) == 0
Loading
Loading