diff --git a/astacus/coordinator/plugins/clickhouse/steps.py b/astacus/coordinator/plugins/clickhouse/steps.py index acef5a6f..098353a2 100644 --- a/astacus/coordinator/plugins/clickhouse/steps.py +++ b/astacus/coordinator/plugins/clickhouse/steps.py @@ -204,7 +204,12 @@ async def run_step(self, cluster: Cluster, context: StepsContext) -> Sequence[Ke for child in children: keeper_map_table_path = os.path.join(self.keeper_map_path_prefix, child) data_path = os.path.join(keeper_map_table_path, "data") - data = await connection.get_children_with_data(data_path) + try: + data = await connection.get_children_with_data(data_path) + except NoNodeError: + logger.info("ZNode %s is missing, table was dropped. Skipping", data_path) + continue + tables.append( KeeperMapTable( name=child, diff --git a/tests/integration/coordinator/plugins/clickhouse/test_plugin.py b/tests/integration/coordinator/plugins/clickhouse/test_plugin.py index a56d60f2..05c3efbd 100644 --- a/tests/integration/coordinator/plugins/clickhouse/test_plugin.py +++ b/tests/integration/coordinator/plugins/clickhouse/test_plugin.py @@ -316,21 +316,37 @@ async def setup_cluster_content(clients: Sequence[HttpClickHouseClient], use_nam await clients[0].execute(b"INSERT INTO default.memory VALUES (123, 'foo')") await clients[0].execute(b"CREATE FUNCTION `linear_equation_\x80` AS (x, k, b) -> k*x + b") if await is_engine_available(clients[0], TableEngine.KeeperMap): - await clients[0].execute( - b"CREATE TABLE default.keeper_map (key UInt32, value String) ENGINE = KeeperMap('keeper_map') PRIMARY KEY key" - ) - await clients[0].execute(b"INSERT INTO default.keeper_map VALUES (1, 'one'), (2, 'two')") + await add_keeper_map_tables(clients) - # wait for every zookeeper node to receive the update - async def keeper_map_has_replicated(client: ClickHouseClient) -> bool: - return len(await client.execute(b"SELECT * FROM default.keeper_map")) == 2 - await wait_for_condition_on_every_node( - clients, - keeper_map_has_replicated, - "waiting for keeper_map to be replicated", - timeout_seconds=5, - ) +async def add_keeper_map_tables(clients: Sequence[ClickHouseClient]) -> None: + # add and drop a table with KeeperMap engine. This leaves some metadata in ZooKeeper that we want to ignore. + await clients[0].execute( + b"CREATE TABLE default.keeper_map_dropped(key UInt32, value String) " + b"ENGINE = KeeperMap('keeper_map_dropped') PRIMARY KEY key" + ) + await clients[0].execute(b"DROP TABLE default.keeper_map_dropped") + # empty table to backup + await clients[0].execute( + b"CREATE TABLE default.keeper_map_empty (key UInt32, value String) " + b"ENGINE = KeeperMap('keeper_map_empty') PRIMARY KEY key" + ) + # add a table to backup + await clients[0].execute( + b"CREATE TABLE default.keeper_map (key UInt32, value String) ENGINE = KeeperMap('keeper_map') PRIMARY KEY key" + ) + await clients[0].execute(b"INSERT INTO default.keeper_map VALUES (1, 'one'), (2, 'two')") + + # wait for every zookeeper node to receive the update + async def keeper_map_has_replicated(client: ClickHouseClient) -> bool: + return len(await client.execute(b"SELECT * FROM default.keeper_map")) == 2 + + await wait_for_condition_on_every_node( + clients, + keeper_map_has_replicated, + "waiting for keeper_map to be replicated", + timeout_seconds=5, + ) async def setup_cluster_users(clients: Sequence[HttpClickHouseClient]) -> None: @@ -590,3 +606,8 @@ async def test_restores_keeper_map_tables(restored_cluster: Sequence[ClickHouseC for client in restored_cluster: result = await client.execute(b"SELECT key, value FROM default.keeper_map ORDER BY key") assert result == [[1, "one"], [2, "two"]] + result = await client.execute(b"SELECT key, value FROM default.keeper_map_empty ORDER BY key") + assert result == [] + with pytest.raises(ClickHouseClientQueryError) as e: + await client.execute(b"SELECT * FROM default.keeper_map_dropped") + assert "Table default.keeper_map_dropped does not exist." in e.value.args[0]