diff --git a/astacus/coordinator/plugins/clickhouse/dependencies.py b/astacus/coordinator/plugins/clickhouse/dependencies.py index d53a2e79..209c02bd 100644 --- a/astacus/coordinator/plugins/clickhouse/dependencies.py +++ b/astacus/coordinator/plugins/clickhouse/dependencies.py @@ -4,12 +4,37 @@ """ from astacus.coordinator.plugins.clickhouse.manifest import AccessEntity, Table from collections.abc import Sequence +from typing import Callable, Hashable, TypeVar # noinspection PyCompatibility import graphlib import re import uuid +Node = TypeVar("Node") +NodeKey = TypeVar("NodeKey", bound=Hashable) + + +def sort_topologically( + nodes: Sequence[Node], + get_key: Callable[[Node], NodeKey], + get_dependencies: Callable[[Node], Sequence[NodeKey]] = lambda x: [], + get_dependants: Callable[[Node], Sequence[NodeKey]] = lambda x: [], +) -> list[Node]: + """ + Sort elements topologically based on their dependencies. + """ + sorter = graphlib.TopologicalSorter() # type: ignore + for element in nodes: + element_key = get_key(element) + sorter.add(element_key) + for dependency in get_dependencies(element): + sorter.add(element_key, dependency) + for dependency in get_dependants(element): + sorter.add(dependency, element_key) + sort_order = list(sorter.static_order()) + return sorted(nodes, key=lambda element: sort_order.index(get_key(element))) + def tables_sorted_by_dependencies(tables: Sequence[Table]) -> Sequence[Table]: """ @@ -22,13 +47,9 @@ def tables_sorted_by_dependencies(tables: Sequence[Table]) -> Sequence[Table]: The `dependencies` attribute of each table must contain the list of `(database_name: str, table_name: str)` that depend on this table. """ - sorter = graphlib.TopologicalSorter() # type: ignore - for table in tables: - sorter.add((table.database, table.name)) - for dependency in table.dependencies: - sorter.add(dependency, (table.database, table.name)) - sort_order = list(sorter.static_order()) - return sorted(tables, key=lambda t: sort_order.index((t.database, t.name))) + return sort_topologically( + tables, get_key=lambda table: (table.database, table.name), get_dependants=lambda table: table.dependencies + ) def access_entities_sorted_by_dependencies(access_entities: Sequence[AccessEntity]) -> Sequence[AccessEntity]: @@ -42,15 +63,15 @@ def access_entities_sorted_by_dependencies(access_entities: Sequence[AccessEntit roles can depend on other roles. This forces us to use a real topological sort to determine the creation order. """ - sorter = graphlib.TopologicalSorter() # type: ignore # Unlike tables, ClickHouse does not provide a list of dependencies between entities. # This means we need to parse the `attach_query` of the entity to find the uuid of # other entities. This is unpleasant, but the quoting format of entity names and entity # uuids is different enough to not risk false matches. clickhouse_id = re.compile(rb"ID\('([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})'\)") - for entity in access_entities: - sorter.add(entity.uuid) - for uuid_bytes in clickhouse_id.findall(entity.attach_query): - sorter.add(entity.uuid, uuid.UUID(uuid_bytes.decode())) - sort_order = list(sorter.static_order()) - return sorted(access_entities, key=lambda e: sort_order.index(e.uuid)) + return sort_topologically( + access_entities, + get_key=lambda entity: entity.uuid, + get_dependencies=lambda entity: [ + uuid.UUID(uuid_bytes.decode()) for uuid_bytes in clickhouse_id.findall(entity.attach_query) + ], + ) diff --git a/astacus/coordinator/plugins/clickhouse/engines.py b/astacus/coordinator/plugins/clickhouse/engines.py new file mode 100644 index 00000000..2b7157f2 --- /dev/null +++ b/astacus/coordinator/plugins/clickhouse/engines.py @@ -0,0 +1,13 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +import enum + + +# This enum contains only used constants +class TableEngine(enum.Enum): + MySQL = "MySQL" + PostgreSQL = "PostgreSQL" + S3 = "S3" diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index 9ec4b3f2..6728b42b 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -46,6 +46,7 @@ import dataclasses import logging import msgspec +import re import secrets import uuid @@ -432,6 +433,12 @@ async def run_on_every_node( await asyncio.gather(*[gather_limited(per_node_concurrency_limit, fn(client)) for client in clients]) +def get_restore_table_query(table: Table) -> bytes: + # Use `ATTACH` instead of `CREATE` for materialized views for + # proper restore in case of `SELECT` table absence + return re.sub(b"^CREATE MATERIALIZED VIEW", b"ATTACH MATERIALIZED VIEW", table.create_query) + + @dataclasses.dataclass class RestoreReplicatedDatabasesStep(Step[None]): """ @@ -534,12 +541,9 @@ def _create_dbs(client: ClickHouseClient) -> Iterator[Awaitable[None]]: # If any known table depends on an unknown table that was inside a non-replicated # database engine, then this will crash. See comment in `RetrieveReplicatedDatabasesStep`. for table in tables_sorted_by_dependencies(manifest.tables): - # Materialized views creates both a table for the view itself and a table - # with the .inner_id. prefix to store the data, we don't need to recreate - # them manually. We will need to restore their data parts however. - if not table.name.startswith(b".inner_id."): - # Create on the first client and let replication do its thing - await self.clients[0].execute(table.create_query, session_id=session_id) + restore_table_query = get_restore_table_query(table) + # Create on the first client and let replication do its thing + await self.clients[0].execute(restore_table_query, session_id=session_id) DatabasesReplicas = Mapping[bytes, Sequence[DatabaseReplica]] diff --git a/tests/integration/coordinator/plugins/clickhouse/conftest.py b/tests/integration/coordinator/plugins/clickhouse/conftest.py index d9615bfa..170d3b34 100644 --- a/tests/integration/coordinator/plugins/clickhouse/conftest.py +++ b/tests/integration/coordinator/plugins/clickhouse/conftest.py @@ -204,6 +204,12 @@ async def fixture_minio_bucket(minio: MinioService) -> AsyncIterator[MinioBucket yield bucket +@pytest.fixture(scope="function", name="function_minio_bucket") +async def fixture_function_minio_bucket(minio: MinioService) -> AsyncIterator[MinioBucket]: + with minio.bucket(bucket_name="function-clickhouse-bucket") as bucket: + yield bucket + + @contextlib.asynccontextmanager async def create_minio_service(ports: Ports) -> AsyncIterator[MinioService]: server_port = ports.allocate() @@ -337,6 +343,10 @@ def create_clickhouse_configs( minio_bucket: MinioBucket | None = None, object_storage_prefix: str = "/", ): + # Helper for emitting XML configuration: avoid very long lines and deduplicate + def setting(name: str, value: int | float | str): + return f"<{name}>{value}" + replicas = "\n".join( f""" @@ -409,9 +419,29 @@ def create_clickhouse_configs( {zookeeper.port} - 5368709120 - 0.5 - true + + {setting("number_of_free_entries_in_pool_to_execute_mutation", 2)} + {setting("number_of_free_entries_in_pool_to_execute_optimize_entire_partition", 2)} + + {setting("background_pool_size", 4)} + {setting("background_move_pool_size", 2)} + {setting("background_fetches_pool_size", 2)} + {setting("background_common_pool_size", 4)} + {setting("background_buffer_flush_schedule_pool_size", 2)} + {setting("background_schedule_pool_size", 2)} + {setting("background_message_broker_schedule_pool_size", 2)} + {setting("background_distributed_schedule_pool_size", 2)} + {setting("tables_loader_foreground_pool_size", 2)} + {setting("tables_loader_background_pool_size", 2)} + {setting("restore_threads", 2)} + {setting("backup_threads", 2)} + {setting("backups_io_thread_pool_queue_size", 2)} + {setting("max_parts_cleaning_thread_pool_size", 2)} + {setting("max_active_parts_loading_thread_pool_size", 2)} + {setting("max_outdated_parts_loading_thread_pool_size", 2)} + {setting("mark_cache_size", 5368709120)} + {setting("max_server_memory_usage_to_ram_ratio", 0.5)} + {setting("enable_system_unfreeze", "true")} {str(data_dir / "users.xml")} diff --git a/tests/integration/coordinator/plugins/clickhouse/test_plugin.py b/tests/integration/coordinator/plugins/clickhouse/test_plugin.py index 186b5301..fbef6598 100644 --- a/tests/integration/coordinator/plugins/clickhouse/test_plugin.py +++ b/tests/integration/coordinator/plugins/clickhouse/test_plugin.py @@ -6,9 +6,17 @@ from _pytest.fixtures import SubRequest from astacus.common.ipc import RestoreRequest from astacus.coordinator.plugins.base import OperationContext -from astacus.coordinator.plugins.clickhouse.client import ClickHouseClient, HttpClickHouseClient +from astacus.coordinator.plugins.clickhouse.client import ( + ClickHouseClient, + ClickHouseClientQueryError, + escape_sql_identifier, + escape_sql_string, + HttpClickHouseClient, +) +from astacus.coordinator.plugins.clickhouse.engines import TableEngine from astacus.coordinator.plugins.clickhouse.plugin import ClickHousePlugin from collections.abc import AsyncIterable, AsyncIterator, Sequence +from contextlib import asynccontextmanager from pathlib import Path from tests.integration.conftest import create_zookeeper, Ports from tests.integration.coordinator.plugins.clickhouse.conftest import ( @@ -51,8 +59,13 @@ def get_restore_steps_names() -> Sequence[str]: return [step.__class__.__name__ for step in steps] -@pytest.fixture(scope="module", name="restorable_cluster") -async def fixture_restorable_cluster( +async def is_engine_available(client: ClickHouseClient, engine: TableEngine) -> bool: + query = f"SELECT TRUE FROM system.table_engines WHERE name = {escape_sql_string(engine.value.encode())}".encode() + return len(await client.execute(query)) == 1 + + +@asynccontextmanager +async def restorable_cluster_manager( ports: Ports, clickhouse_command: ClickHouseCommand, minio_bucket: MinioBucket, @@ -79,15 +92,24 @@ async def fixture_restorable_cluster( yield storage_path -@pytest.fixture(scope="module", name="restored_cluster", params=[*get_restore_steps_names(), None]) -async def fixture_restored_cluster( +@pytest.fixture(scope="module", name="restorable_cluster") +async def fixture_restorable_cluster( + ports: Ports, + clickhouse_command: ClickHouseCommand, + minio_bucket: MinioBucket, +) -> AsyncIterator[Path]: + async with restorable_cluster_manager(ports, clickhouse_command, minio_bucket) as restorable_cluster: + yield restorable_cluster + + +@asynccontextmanager +async def restored_cluster_manager( restorable_cluster: Path, ports: Ports, - request: SubRequest, + stop_after_step: str | None, clickhouse_restore_command: ClickHouseCommand, minio_bucket: MinioBucket, -) -> AsyncIterable[Sequence[ClickHouseClient]]: - stop_after_step: str = request.param +) -> AsyncIterator[Sequence[ClickHouseClient]]: restorable_source = RestorableSource( astacus_storage_path=restorable_cluster / "astacus_backup", clickhouse_object_storage_prefix="restorable/" ) @@ -122,9 +144,51 @@ async def fixture_restored_cluster( "restorable", ) run_astacus_command(astacus_cluster, "restore", "--storage", "restorable") + await sync_replicated_tables(clients) yield clients +@pytest.fixture(scope="module", name="restored_cluster", params=[*get_restore_steps_names(), None]) +async def fixture_restored_cluster( + ports: Ports, + request: SubRequest, + restorable_cluster: Path, + clickhouse_restore_command: ClickHouseCommand, + minio_bucket: MinioBucket, +) -> AsyncIterable[Sequence[ClickHouseClient]]: + stop_after_step: str | None = request.param + async with restored_cluster_manager( + restorable_cluster, ports, stop_after_step, clickhouse_restore_command, minio_bucket + ) as clients: + yield clients + + +@pytest.fixture(scope="function", name="function_restored_cluster") +async def fixture_function_restored_cluster( + ports: Ports, + clickhouse_command: ClickHouseCommand, + clickhouse_restore_command: ClickHouseCommand, + function_minio_bucket: MinioBucket, +) -> AsyncIterable[Sequence[ClickHouseClient]]: + async with restorable_cluster_manager(ports, clickhouse_command, function_minio_bucket) as restorable_cluster: + async with restored_cluster_manager( + restorable_cluster, ports, None, clickhouse_restore_command, function_minio_bucket + ) as clients: + yield clients + + +async def sync_replicated_tables(clients: Sequence[ClickHouseClient]) -> None: + # Get replicated tables to sync + rows = await clients[0].execute( + b"SELECT name FROM system.tables WHERE database = 'default' AND engine like 'Replicated%'" + ) + for client in clients: + for row in rows: + table_name = row[0] + assert isinstance(table_name, str) + await client.execute(f"SYSTEM SYNC REPLICA default.{escape_sql_identifier(table_name.encode())} STRICT".encode()) + + async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_named_collections: bool) -> None: for client in clients: await client.execute(b"DROP DATABASE default SYNC") @@ -181,14 +245,18 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam b"SETTINGS flatten_nested=1" ) # integrations - note most of these never actually attempt to connect to the remote server. - await clients[0].execute( - b"CREATE TABLE default.postgresql (a Int) " - b"ENGINE = PostgreSQL('https://host:1234', 'database', 'table', 'user', 'password')" - ) - await clients[0].execute( - b"CREATE TABLE default.mysql (a Int) ENGINE = MySQL('https://host:1234', 'database', 'table', 'user', 'password')" - ) - await clients[0].execute(b"CREATE TABLE default.s3 (a Int) ENGINE = S3('http://bucket.s3.amazonaws.com/key.json')") + if await is_engine_available(clients[0], TableEngine.PostgreSQL): + await clients[0].execute( + b"CREATE TABLE default.postgresql (a Int) " + b"ENGINE = PostgreSQL('https://host:1234', 'database', 'table', 'user', 'password')" + ) + if await is_engine_available(clients[0], TableEngine.MySQL): + await clients[0].execute( + b"CREATE TABLE default.mysql (a Int) " + b"ENGINE = MySQL('https://host:1234', 'database', 'table', 'user', 'password')" + ) + if await is_engine_available(clients[0], TableEngine.S3): + await clients[0].execute(b"CREATE TABLE default.s3 (a Int) ENGINE = S3('http://bucket.s3.amazonaws.com/key.json')") # add a function table await clients[0].execute(b"CREATE TABLE default.from_function_table AS numbers(3)") # add a table with data in object storage @@ -201,11 +269,30 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam await clients[0].execute( b"CREATE VIEW default.simple_view AS SELECT toInt32(thekey * 2) as thekey2 FROM default.replicated_merge_tree" ) + await clients[0].execute( + b"CREATE TABLE default.source_table_for_view_deleted (thekey UInt32, thedata String) " + b"ENGINE = ReplicatedMergeTree ORDER BY (thekey)" + ) + await clients[0].execute( + b"CREATE VIEW default.view_deleted_source AS SELECT toInt32(thekey * 2) as thekey2 " + b"FROM default.source_table_for_view_deleted" + ) await clients[0].execute( b"CREATE MATERIALIZED VIEW default.materialized_view " b"ENGINE = MergeTree ORDER BY (thekey3) " b"AS SELECT toInt32(thekey * 3) as thekey3 FROM default.replicated_merge_tree" ) + await clients[0].execute( + b"CREATE TABLE default.data_table_for_mv (thekey3 UInt32) ENGINE = ReplicatedMergeTree ORDER BY (thekey3)" + ) + await clients[0].execute( + b"CREATE MATERIALIZED VIEW default.materialized_view_deleted_source to default.data_table_for_mv " + b"AS SELECT toInt32(thekey * 3) as thekey3 FROM default.source_table_for_view_deleted" + ) + await clients[0].execute(b"INSERT INTO default.source_table_for_view_deleted VALUES (7, '7')") + await clients[2].execute(b"INSERT INTO default.source_table_for_view_deleted VALUES (9, '9')") + await clients[0].execute(b"DROP TABLE default.source_table_for_view_deleted") + await clients[0].execute(b"CREATE TABLE default.memory (thekey UInt32, thedata String) ENGINE = Memory") # This will be replicated between nodes of the same shard (servers 0 and 1, but not 2) await clients[0].execute(b"INSERT INTO default.replicated_merge_tree VALUES (123, 'foo')") @@ -343,6 +430,57 @@ async def test_restores_materialized_view_data(restored_cluster: Sequence[ClickH assert response == expected_data +async def test_restores_materialized_view_deleted_source_table(restored_cluster: Sequence[ClickHouseClient]) -> None: + s1_data = [[7 * 3]] + s2_data = [[9 * 3]] + cluster_data = [s1_data, s1_data, s2_data] + for client, expected_data in zip(restored_cluster, cluster_data): + response = await client.execute(b"SELECT thekey3 FROM default.materialized_view_deleted_source ORDER BY thekey3") + assert response == expected_data + + +async def test_restores_materialized_view_with_undeleted_source_table( + function_restored_cluster: Sequence[ClickHouseClient], +) -> None: + await function_restored_cluster[0].execute( + b"CREATE TABLE default.source_table_for_view_deleted (thekey UInt32, thedata String) " + b"ENGINE = ReplicatedMergeTree ORDER BY (thekey)" + ) + await function_restored_cluster[0].execute(b"INSERT INTO default.source_table_for_view_deleted VALUES (8, '8')") + s1_data = [[7 * 3], [8 * 3]] + s2_data = [[9 * 3]] + cluster_data = [s1_data, s1_data, s2_data] + # Recreated deleted table works again + for client, expected_data in zip(function_restored_cluster, cluster_data): + response = await client.execute(b"SELECT thekey3 FROM default.materialized_view_deleted_source ORDER BY thekey3") + assert response == expected_data + + +async def test_restores_view_with_deleted_source_table(restored_cluster: Sequence[ClickHouseClient]) -> None: + unknown_table_exception_code = 60 + for client in restored_cluster: + with pytest.raises(ClickHouseClientQueryError) as raised: + await client.execute(b"SELECT thekey2 FROM default.view_deleted_source ORDER BY thekey2") + assert raised.value.status_code == 404 + assert raised.value.exception_code == unknown_table_exception_code + + +async def test_restores_view_undeleted_source_table(function_restored_cluster: Sequence[ClickHouseClient]) -> None: + await function_restored_cluster[0].execute( + b"CREATE TABLE default.source_table_for_view_deleted (thekey UInt32, thedata String) " + b"ENGINE = ReplicatedMergeTree ORDER BY (thekey)" + ) + await function_restored_cluster[0].execute(b"INSERT INTO default.source_table_for_view_deleted VALUES (11, '11')") + await function_restored_cluster[2].execute(b"INSERT INTO default.source_table_for_view_deleted VALUES (17, '17')") + s1_data = [[11 * 2]] + s2_data = [[17 * 2]] + cluster_data = [s1_data, s1_data, s2_data] + # Recreated deleted table works again + for client, expected_data in zip(function_restored_cluster, cluster_data): + response = await client.execute(b"SELECT thekey2 FROM default.view_deleted_source ORDER BY thekey2") + assert response == expected_data + + async def test_restores_connectivity_between_distributed_servers(restored_cluster: Sequence[ClickHouseClient]) -> None: # This only works if each node can connect to all nodes of the cluster named after the Distributed database for client in restored_cluster: @@ -391,12 +529,18 @@ async def test_cleanup_does_not_break_object_storage_disk_files( await check_object_storage_data(clients) -async def test_restores_integration_tables(restored_cluster: Sequence[ClickHouseClient]) -> None: +@pytest.mark.parametrize( + "table_name,table_engine", + [ + ("default.postgresql", TableEngine.PostgreSQL), + ("default.mysql", TableEngine.MySQL), + ("default.s3", TableEngine.S3), + ], +) +async def test_restores_integration_tables( + restored_cluster: Sequence[ClickHouseClient], table_name: str, table_engine: TableEngine +) -> None: for client in restored_cluster: - assert await table_exists(client, "default.postgresql") - assert await table_exists(client, "default.mysql") - assert await table_exists(client, "default.s3") - - -async def table_exists(client: ClickHouseClient, table_name: str) -> bool: - return bool(await client.execute(f"EXISTS TABLE {table_name}".encode())) + if not await is_engine_available(client, table_engine): + pytest.skip(f"Table engine {table_engine.value} not available") + assert bool(await client.execute(f"EXISTS TABLE {table_name}".encode())) diff --git a/tests/unit/coordinator/plugins/clickhouse/test_dependencies.py b/tests/unit/coordinator/plugins/clickhouse/test_dependencies.py index a3891c66..447ccaba 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_dependencies.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_dependencies.py @@ -48,6 +48,7 @@ def test_tables_sorted_by_dependencies() -> None: dependencies=[(b"db_one", b"t2"), (b"db_one", b"t3")], ) assert tables_sorted_by_dependencies([t1, t2, t3, t4]) == [t1, t4, t3, t2] + assert tables_sorted_by_dependencies([t2, t3, t4, t1]) == [t1, t4, t3, t2] def test_dangling_table_dependency_doesnt_crash() -> None: @@ -94,6 +95,7 @@ def test_access_entities_sorted_by_dependencies() -> None: type="R", uuid=uuid.UUID("00000000-0000-abcd-0000-000000000004"), name=b"a4", attach_query=b"ATTACH ROLE a4" ) assert access_entities_sorted_by_dependencies([a1, a2, a3, a4]) == [a4, a2, a3, a1] + assert access_entities_sorted_by_dependencies([a2, a4, a3, a1]) == [a4, a2, a3, a1] def test_dangling_access_entities_doesnt_crash() -> None: diff --git a/tests/unit/coordinator/plugins/clickhouse/test_steps.py b/tests/unit/coordinator/plugins/clickhouse/test_steps.py index 615becb2..bf63f03f 100644 --- a/tests/unit/coordinator/plugins/clickhouse/test_steps.py +++ b/tests/unit/coordinator/plugins/clickhouse/test_steps.py @@ -49,6 +49,7 @@ DeleteDanglingObjectStorageFilesStep, FreezeTablesStep, FreezeUnfreezeTablesStepBase, + get_restore_table_query, GetVersionsStep, ListDatabaseReplicasStep, MoveFrozenPartsStep, @@ -1242,3 +1243,36 @@ async def call() -> None: def create_object_storage_disk(name: str, object_storage: AsyncObjectStorage | None) -> Disk: return Disk(type=DiskType.object_storage, name=name, path_parts=("disks", name), object_storage=object_storage) + + +@pytest.mark.parametrize( + "original_query,rewritten_query", + [ + [ + b"CREATE VIEW FOO AS SELECT 1", + b"CREATE VIEW FOO AS SELECT 1", + ], + [ + b"CREATE VIEW `CREATE MATERIALIZED VIEW` AS SELECT 1", + b"CREATE VIEW `CREATE MATERIALIZED VIEW` AS SELECT 1", + ], + [ + b"CREATE MATERIALIZED VIEW FOO AS SELECT 1", + b"ATTACH MATERIALIZED VIEW FOO AS SELECT 1", + ], + [ + b"CREATE MATERIALIZED VIEW `CREATE MATERIALIZED VIEW` AS SELECT 1", + b"ATTACH MATERIALIZED VIEW `CREATE MATERIALIZED VIEW` AS SELECT 1", + ], + ], +) +def test_get_restore_table_query(original_query: bytes, rewritten_query: bytes): + table = Table( + database=b"db", + name=b"table", + uuid=uuid.UUID("00000000-0000-0000-0000-100000000001"), + engine="Engine", + create_query=original_query, + dependencies=[], + ) + assert get_restore_table_query(table) == rewritten_query