From b0fb8ba872c11e6cda1974d6de95c72951e67728 Mon Sep 17 00:00:00 2001 From: Simon Chen <1020359403@qq.com> Date: Mon, 5 Aug 2024 22:14:58 +0800 Subject: [PATCH] Support for asynchronous streaming results (#323) --- clickhouse_sqlalchemy/drivers/asynch/base.py | 9 ++++++++- tests/drivers/asynch/test_cursor.py | 13 +++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/clickhouse_sqlalchemy/drivers/asynch/base.py b/clickhouse_sqlalchemy/drivers/asynch/base.py index 0f757f9e..5a28ea6e 100644 --- a/clickhouse_sqlalchemy/drivers/asynch/base.py +++ b/clickhouse_sqlalchemy/drivers/asynch/base.py @@ -4,17 +4,24 @@ from sqlalchemy.pool import AsyncAdaptedQueuePool from .connector import AsyncAdapt_asynch_dbapi -from ..native.base import ClickHouseDialect_native +from ..native.base import ClickHouseDialect_native, ClickHouseExecutionContext # Export connector version VERSION = (0, 0, 1, None) +class ClickHouseAsynchExecutionContext(ClickHouseExecutionContext): + def create_server_side_cursor(self): + return self.create_default_cursor() + + class ClickHouseDialect_asynch(ClickHouseDialect_native): driver = 'asynch' + execution_ctx_cls = ClickHouseAsynchExecutionContext is_async = True supports_statement_cache = True + supports_server_side_cursors = True @classmethod def import_dbapi(cls): diff --git a/tests/drivers/asynch/test_cursor.py b/tests/drivers/asynch/test_cursor.py index c2a723be..7d6d45c6 100644 --- a/tests/drivers/asynch/test_cursor.py +++ b/tests/drivers/asynch/test_cursor.py @@ -35,3 +35,16 @@ async def test_check_iter_cursor(self): ) self.assertListEqual(list(rv), [(x,) for x in range(5)]) + + @run_async + async def test_execute_with_stream(self): + conn = await self.get_connection() + async with conn.stream( + text("SELECT * FROM system.numbers LIMIT 10") + ) as result: + idx = 0 + async for r in result: + self.assertEqual(r[0], idx) + idx += 1 + + self.assertEqual(idx, 10)