-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ClickHouse: pause KeeperMap insertion momentarily during backup #252
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
DeleteDanglingObjectStorageFilesStep, | ||
FreezeTablesStep, | ||
GetVersionsStep, | ||
KeeperMapTablesReadOnlyStep, | ||
ListDatabaseReplicasStep, | ||
MoveFrozenPartsStep, | ||
PrepareClickHouseManifestStep, | ||
|
@@ -129,14 +130,17 @@ def get_backup_steps(self, *, context: OperationContext) -> Sequence[Step[Any]]: | |
), | ||
RetrieveDatabasesAndTablesStep(clients=clickhouse_clients), | ||
RetrieveMacrosStep(clients=clickhouse_clients), | ||
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=False), | ||
RetrieveKeeperMapTableDataStep( | ||
zookeeper_client=zookeeper_client, | ||
keeper_map_path_prefix=self.keeper_map_path_prefix, | ||
clients=clickhouse_clients, | ||
), | ||
# Then freeze all tables | ||
FreezeTablesStep( | ||
clients=clickhouse_clients, freeze_name=self.freeze_name, freeze_unfreeze_timeout=self.freeze_timeout | ||
), | ||
KeeperMapTablesReadOnlyStep(clients=clickhouse_clients, allow_writes=True), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like we need to have stronger guarantees that this will run in the case that there is a failure in RetrieveKeeperMapTableDataStep or FreezeTablesStep. Maybe each Step could optionally add a function onto a stack and in the case of a failure, all the functions are popped from the stack and run. Or you could have a step that wraps other steps? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only exception the |
||
# Then snapshot and backup all frozen table parts | ||
SnapshotStep( | ||
snapshot_groups=disks.get_snapshot_groups(self.freeze_name), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,14 +3,16 @@ | |
""" | ||
|
||
from .conftest import ClickHouseCommand, create_clickhouse_cluster, get_clickhouse_client, MinioBucket | ||
from .test_plugin import setup_cluster_users | ||
from astacus.coordinator.cluster import Cluster | ||
from astacus.coordinator.plugins.base import StepsContext | ||
from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, ClickHouseClientQueryError, HttpClickHouseClient | ||
from astacus.coordinator.plugins.clickhouse.manifest import ReplicatedDatabase, Table | ||
from astacus.coordinator.plugins.clickhouse.steps import RetrieveDatabasesAndTablesStep | ||
from astacus.coordinator.plugins.clickhouse.steps import KeeperMapTablesReadOnlyStep, RetrieveDatabasesAndTablesStep | ||
from base64 import b64decode | ||
from collections.abc import Sequence | ||
from collections.abc import AsyncIterator, Sequence | ||
from tests.integration.conftest import create_zookeeper, Ports | ||
from typing import cast | ||
from typing import cast, NamedTuple | ||
from uuid import UUID | ||
|
||
import pytest | ||
|
@@ -99,3 +101,77 @@ async def test_retrieve_tables(ports: Ports, clickhouse_command: ClickHouseComma | |
dependencies=[], | ||
), | ||
] | ||
|
||
|
||
class KeeperMapInfo(NamedTuple): | ||
context: StepsContext | ||
clickhouse_client: ClickHouseClient | ||
user_client: ClickHouseClient | ||
|
||
|
||
@pytest.fixture(name="keeper_table_context") | ||
async def fixture_keeper_table_context( | ||
ports: Ports, clickhouse_command: ClickHouseCommand, minio_bucket: MinioBucket | ||
) -> AsyncIterator[KeeperMapInfo]: | ||
async with ( | ||
create_zookeeper(ports) as zookeeper, | ||
create_clickhouse_cluster(zookeeper, minio_bucket, ports, ["s1"], clickhouse_command) as clickhouse_cluster, | ||
): | ||
clickhouse = clickhouse_cluster.services[0] | ||
admin_client = get_clickhouse_client(clickhouse) | ||
await setup_cluster_users([admin_client]) | ||
for statement in [ | ||
b"CREATE DATABASE `keeperdata` ENGINE = Replicated('/clickhouse/databases/keeperdata', '{my_shard}', '{my_replica}')", | ||
b"CREATE TABLE `keeperdata`.`keepertable` (thekey UInt32, thevalue UInt32) ENGINE = KeeperMap('test', 1000) PRIMARY KEY thekey", | ||
b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(1) FROM numbers(3)", | ||
b"CREATE USER bob IDENTIFIED WITH sha256_password BY 'secret'", | ||
b"GRANT INSERT, SELECT, UPDATE, DELETE ON `keeperdata`.`keepertable` TO `bob`", | ||
]: | ||
await admin_client.execute(statement) | ||
user_client = HttpClickHouseClient( | ||
host=clickhouse.host, | ||
port=clickhouse.port, | ||
username="bob", | ||
password="secret", | ||
timeout=10, | ||
) | ||
step = RetrieveDatabasesAndTablesStep(clients=[admin_client]) | ||
context = StepsContext() | ||
databases_tables_result = await step.run_step(Cluster(nodes=[]), context=context) | ||
context.set_result(RetrieveDatabasesAndTablesStep, databases_tables_result) | ||
yield KeeperMapInfo(context, admin_client, user_client) | ||
|
||
|
||
async def test_keeper_map_table_select_only_setting_modified(keeper_table_context: KeeperMapInfo) -> None: | ||
steps_context, admin_client, user_client = keeper_table_context | ||
read_only_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=False) | ||
# After the read-only step, the user should only be able to select from the table | ||
await read_only_step.run_step(Cluster(nodes=[]), steps_context) | ||
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): | ||
await user_client.execute( | ||
b"INSERT INTO `keeperdata`.`keepertable` SETTINGS wait_for_async_insert=1 SELECT *, materialize(2) FROM numbers(3)" | ||
) | ||
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, we are basically do it for the "ClickHouse connector for Kafka". I have not read it's code, but I suspect some new problems in the normal operation.
If so, it is no better than it was, because previously it was not happening during normal operation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The sink connector pauses writes to ClickHouse for these exceptions: https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/src/main/java/com/clickhouse/kafka/connect/util/Utils.java#L68 If the exception is retriable, the connector won't commit the offsets to the KeeperMap table, and won't try to write that part to ClickHouse until later. The design docs describes the state machine transitions: https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/docs/DESIGN.md There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Critically, the raised |
||
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20") | ||
with pytest.raises(ClickHouseClientQueryError, match=".*ACCESS_DENIED.*"): | ||
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20") | ||
read_only_row_count = cast( | ||
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") | ||
) | ||
assert int(read_only_row_count[0][0]) == 3 | ||
# After the read-write step, the user should be able to write, update and delete from the table | ||
read_write_step = KeeperMapTablesReadOnlyStep(clients=[admin_client], allow_writes=True) | ||
await read_write_step.run_step(Cluster(nodes=[]), steps_context) | ||
await user_client.execute(b"INSERT INTO `keeperdata`.`keepertable` SELECT *, materialize(3) FROM numbers(3, 3)") | ||
read_write_row_count = cast( | ||
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") | ||
) | ||
assert int(read_write_row_count[0][0]) == 6 | ||
await user_client.execute(b"ALTER TABLE `keeperdata`.`keepertable` UPDATE thevalue = 3 WHERE thekey < 20") | ||
current_values = await user_client.execute(b"SELECT thevalue FROM `keeperdata`.`keepertable` ORDER BY thekey") | ||
assert all(int(cast(str, val[0])) == 3 for val in current_values) | ||
await user_client.execute(b"DELETE FROM `keeperdata`.`keepertable` WHERE thekey < 20") | ||
post_delete_row_count = cast( | ||
Sequence[tuple[int]], await user_client.execute(b"SELECT count() FROM `keeperdata`.`keepertable`") | ||
) | ||
assert int(post_delete_row_count[0][0]) == 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I was thinking maybe more using a stack like
But this is probably good enough
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe the except needs to be more broad?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading the code, if I understand correctly there are two scenarios:
Transient
then the step will be retried, so there's nothing to put on the stack yet.astacus.common.exceptions.PermanentException
subclass) then there won't be a next step. The exception clause in question sets the step to failed, and higher up the caller stack the wholeOp
is aborted (in this case theBackupOp
).Which case do you think could result in multiple steps needing to be unwound?
Looking at the usages for other
PermanentException
subclasses they're either raised during the restore steps or by the Rohmu storage subclasses (which would be caught when the server is being started/configured I think). Might be worth broadeningStepFailedError
toPermanentException
..