diff --git a/dask_snowflake/core.py b/dask_snowflake/core.py index 9059d6c..fb41916 100644 --- a/dask_snowflake/core.py +++ b/dask_snowflake/core.py @@ -263,12 +263,12 @@ def read_snowflake( if partition_size is None and npartitions is None: partition_size = "100MiB" - label = "read-snowflake-" - output_name = label + tokenize( - query, - connection_kwargs, - arrow_options, - ) + # label = "read-snowflake-" + # output_name = label + tokenize( + # query, + # connection_kwargs, + # arrow_options, + # ) # Disable `log_imported_packages_in_telemetry` as a temporary workaround for # https://github.com/snowflakedb/snowflake-connector-python/issues/1648. @@ -286,11 +286,13 @@ def read_snowflake( # right partner application ID. batches = _fetch_query_batches(query, connection_kwargs, execute_params).compute() if not batches: - # Empty results set -> return an empty DataFrame - meta = dd.utils.make_meta({}) - graph = {(output_name, 0): meta} - divisions = (None, None) - return new_dd_object(graph, output_name, meta, divisions) + return dd.from_pandas(pd.DataFrame(), npartitions=1) + # if not batches: + # # Empty results set -> return an empty DataFrame + # meta = dd.utils.make_meta({}) + # graph = {(output_name, 0): meta} + # divisions = (None, None) + # return new_dd_object(graph, output_name, meta, divisions) batch_types = set(type(b) for b in batches) if len(batch_types) > 1 or next(iter(batch_types)) is not ArrowResultBatch: @@ -310,15 +312,24 @@ def read_snowflake( batches, meta, npartitions=npartitions, partition_size=partition_size ) - # Create Blockwise layer - layer = DataFrameIOLayer( - output_name, - meta.columns, - batches_partitioned, - # TODO: Implement wrapper to only convert columns requested + divisions = tuple([None] * (len(batches_partitioned) + 1)) + + return dd.from_map( partial(_fetch_batches, arrow_options=arrow_options), - label=label, + batches_partitioned, + npartitions = divisions, + meta=meta, ) - divisions = tuple([None] * (len(batches_partitioned) + 1)) - graph = HighLevelGraph({output_name: layer}, {output_name: set()}) - return new_dd_object(graph, output_name, meta, divisions) + + + # # Create Blockwise layer + # layer = DataFrameIOLayer( + # output_name, + # meta.columns, + # batches_partitioned, + # # TODO: Implement wrapper to only convert columns requested + # partial(_fetch_batches, arrow_options=arrow_options), + # label=label, + # ) + # graph = HighLevelGraph({output_name: layer}, {output_name: set()}) + # return new_dd_object(graph, output_name, meta, divisions)