Skip to content

Commit

Permalink
converting from HLG to from_map
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-rakowski committed Jun 25, 2024
1 parent 7c0eb47 commit 4cfd6cf
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions dask_snowflake/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,12 @@ def read_snowflake(
if partition_size is None and npartitions is None:
partition_size = "100MiB"

label = "read-snowflake-"
output_name = label + tokenize(
query,
connection_kwargs,
arrow_options,
)
# label = "read-snowflake-"
# output_name = label + tokenize(
# query,
# connection_kwargs,
# arrow_options,
# )

# Disable `log_imported_packages_in_telemetry` as a temporary workaround for
# https://github.com/snowflakedb/snowflake-connector-python/issues/1648.
Expand All @@ -286,11 +286,13 @@ def read_snowflake(
# right partner application ID.
batches = _fetch_query_batches(query, connection_kwargs, execute_params).compute()
if not batches:
# Empty results set -> return an empty DataFrame
meta = dd.utils.make_meta({})
graph = {(output_name, 0): meta}
divisions = (None, None)
return new_dd_object(graph, output_name, meta, divisions)
return dd.from_pandas(pd.DataFrame(), npartitions=1)
# if not batches:
# # Empty results set -> return an empty DataFrame
# meta = dd.utils.make_meta({})
# graph = {(output_name, 0): meta}
# divisions = (None, None)
# return new_dd_object(graph, output_name, meta, divisions)

batch_types = set(type(b) for b in batches)
if len(batch_types) > 1 or next(iter(batch_types)) is not ArrowResultBatch:
Expand All @@ -310,15 +312,24 @@ def read_snowflake(
batches, meta, npartitions=npartitions, partition_size=partition_size
)

# Create Blockwise layer
layer = DataFrameIOLayer(
output_name,
meta.columns,
batches_partitioned,
# TODO: Implement wrapper to only convert columns requested
divisions = tuple([None] * (len(batches_partitioned) + 1))

return dd.from_map(
partial(_fetch_batches, arrow_options=arrow_options),
label=label,
batches_partitioned,
npartitions = divisions,
meta=meta,
)
divisions = tuple([None] * (len(batches_partitioned) + 1))
graph = HighLevelGraph({output_name: layer}, {output_name: set()})
return new_dd_object(graph, output_name, meta, divisions)


# # Create Blockwise layer
# layer = DataFrameIOLayer(
# output_name,
# meta.columns,
# batches_partitioned,
# # TODO: Implement wrapper to only convert columns requested
# partial(_fetch_batches, arrow_options=arrow_options),
# label=label,
# )
# graph = HighLevelGraph({output_name: layer}, {output_name: set()})
# return new_dd_object(graph, output_name, meta, divisions)

0 comments on commit 4cfd6cf

Please sign in to comment.