Skip to content

Commit

Permalink
Deprecated and remove from_legacy_dataframe usage (#1168)
Browse files Browse the repository at this point in the history
Co-authored-by: James Bourbeau <[email protected]>
  • Loading branch information
phofl and jrbourbeau authored Nov 19, 2024
1 parent a7a963a commit ef6f27f
Show file tree
Hide file tree
Showing 12 changed files with 181 additions and 83 deletions.
107 changes: 71 additions & 36 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
meta_series_constructor,
pyarrow_strings_enabled,
)
from dask.delayed import delayed
from dask.delayed import Delayed, delayed
from dask.utils import (
IndexCallable,
M,
Expand All @@ -66,6 +66,7 @@
from pandas.api.types import is_bool_dtype, is_datetime64_any_dtype, is_numeric_dtype
from pandas.api.types import is_scalar as pd_is_scalar
from pandas.api.types import is_timedelta64_dtype
from pandas.core.dtypes.common import is_extension_array_dtype
from pyarrow import fs as pa_fs
from tlz import first

Expand Down Expand Up @@ -159,9 +160,7 @@ def _wrap_expr_op(self, other, op=None):
if isinstance(other, FrameBase):
other = other.expr
elif isinstance(other, da.Array):
other = from_dask_array(
other, index=self.index.to_legacy_dataframe(), columns=self.columns
)
other = from_dask_array(other, index=self.index, columns=self.columns)
if self.ndim == 1 and len(self.columns):
other = other[self.columns[0]]

Expand Down Expand Up @@ -1371,11 +1370,9 @@ def repartition(
Repartition(self, npartitions, divisions, force, partition_size, freq)
)

def to_dask_dataframe(self, *args, **kwargs) -> _Frame:
def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame:
"""Convert to a legacy dask-dataframe collection
WARNING: This API is deprecated. Please use `to_legacy_dataframe`.
Parameters
----------
optimize
Expand All @@ -1384,21 +1381,11 @@ def to_dask_dataframe(self, *args, **kwargs) -> _Frame:
Key-word arguments to pass through to `optimize`.
"""
warnings.warn(
"`to_dask_dataframe` is deprecated, please use `to_legacy_dataframe`.",
"to_legacy_dataframe is deprecated and will be removed in a future release. "
"The legacy implementation as a whole is deprecated and will be removed, making "
"this method unnecessary.",
FutureWarning,
)
return self.to_legacy_dataframe(*args, **kwargs)

def to_legacy_dataframe(self, optimize: bool = True, **optimize_kwargs) -> _Frame:
"""Convert to a legacy dask-dataframe collection
Parameters
----------
optimize
Whether to optimize the underlying `Expr` object before conversion.
**optimize_kwargs
Key-word arguments to pass through to `optimize`.
"""
df = self.optimize(**optimize_kwargs) if optimize else self
return new_dd_object(df.dask, df._name, df._meta, df.divisions)

Expand Down Expand Up @@ -1430,9 +1417,18 @@ def to_dask_array(
-------
A Dask Array
"""
return self.to_legacy_dataframe(optimize, **optimize_kwargs).to_dask_array(
lengths=lengths, meta=meta
)
if lengths is True:
lengths = tuple(self.map_partitions(len, enforce_metadata=False).compute())

arr = self.values

chunks = self._validate_chunks(arr, lengths)
arr._chunks = chunks

if meta is not None:
arr._meta = meta

return arr

@property
def values(self):
Expand All @@ -1442,7 +1438,13 @@ def values(self):
Operations that depend on shape information, like slicing or reshaping,
will not work.
"""
return self.to_dask_array()
if is_extension_array_dtype(self._meta.values):
warnings.warn(
"Dask currently has limited support for converting pandas extension dtypes "
f"to arrays. Converting {self._meta.values.dtype} to object dtype.",
UserWarning,
)
return self.map_partitions(methods.values)

def __divmod__(self, other):
result = self.expr.__divmod__(other)
Expand Down Expand Up @@ -2460,15 +2462,38 @@ def to_records(self, index=False, lengths=None):

if lengths is True:
lengths = tuple(self.map_partitions(len).compute())
records = to_records(self)

frame = self.to_legacy_dataframe()
records = to_records(frame)

chunks = frame._validate_chunks(records, lengths)
chunks = self._validate_chunks(records, lengths)
records._chunks = (chunks[0],)

return records

def _validate_chunks(self, arr, lengths):
from collections.abc import Sequence

from dask.array.core import normalize_chunks

if isinstance(lengths, Sequence):
lengths = tuple(lengths)

if len(lengths) != self.npartitions:
raise ValueError(
"The number of items in 'lengths' does not match the number of "
f"partitions. {len(lengths)} != {self.npartitions}"
)

if self.ndim == 1:
chunks = normalize_chunks((lengths,))
else:
chunks = normalize_chunks((lengths, (len(self.columns),)))

return chunks
elif lengths is not None:
raise ValueError(f"Unexpected value for 'lengths': '{lengths}'")

return arr._chunks

def to_bag(self, index=False, format="tuple"):
"""Create a Dask Bag from a Series"""
from dask_expr.io.bag import to_bag
Expand Down Expand Up @@ -2498,7 +2523,13 @@ def to_delayed(self, optimize_graph=True):
--------
dask_expr.from_delayed
"""
return self.to_legacy_dataframe().to_delayed(optimize_graph=optimize_graph)
if optimize_graph:
frame = self.optimize()
else:
frame = self
keys = frame.__dask_keys__()
graph = frame.__dask_graph__()
return [Delayed(k, graph) for k in keys]

def to_backend(self, backend: str | None = None, **kwargs):
"""Move to a new DataFrame backend
Expand Down Expand Up @@ -2812,9 +2843,7 @@ def assign(self, **pairs):
"Number of partitions do not match "
f"({v.npartitions} != {result.npartitions})"
)
v = from_dask_array(
v, index=result.index.to_legacy_dataframe(), meta=result._meta
)
v = from_dask_array(v, index=result.index, meta=result._meta)
else:
raise TypeError(f"Column assignment doesn't support type {type(v)}")
args.extend([k, v])
Expand Down Expand Up @@ -4797,6 +4826,9 @@ def __array__(self):
def dtype(self):
return pd.Series(self._meta).dtype

def to_delayed(self, optimize_graph=True):
return super().to_delayed(optimize_graph=optimize_graph)[0]


def new_collection(expr):
"""Create new collection from an expr"""
Expand Down Expand Up @@ -5028,6 +5060,12 @@ def from_legacy_dataframe(ddf: _Frame, optimize: bool = True) -> FrameBase:
optimize
Whether to optimize the graph before conversion.
"""
warnings.warn(
"from_legacy_dataframe is deprecated and will be removed in a future release. "
"The legacy implementation as a whole is deprecated and will be removed, making "
"this method unnecessary.",
FutureWarning,
)
graph = ddf.dask
if optimize:
graph = ddf.__dask_optimize__(graph, ddf.__dask_keys__())
Expand Down Expand Up @@ -5083,12 +5121,9 @@ def from_dask_array(x, columns=None, index=None, meta=None):
"""
from dask.dataframe.io import from_dask_array

if isinstance(index, FrameBase):
index = index.to_legacy_dataframe()
if columns is not None and isinstance(columns, list) and not len(columns):
columns = None
df = from_dask_array(x, columns=columns, index=index, meta=meta)
return from_legacy_dataframe(df, optimize=True)
return from_dask_array(x, columns=columns, index=index, meta=meta)


@dataframe_creation_dispatch.register_inplace("pandas")
Expand Down
7 changes: 3 additions & 4 deletions dask_expr/_indexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from pandas.api.types import is_bool_dtype
from pandas.errors import IndexingError

from dask_expr._collection import Series, from_legacy_dataframe, new_collection
from dask_expr import from_dask_array
from dask_expr._collection import Series, new_collection
from dask_expr._expr import (
Blockwise,
MaybeAlignPartitions,
Expand Down Expand Up @@ -134,9 +135,7 @@ def _loc_series(self, iindexer, cindexer, check_alignment=True):
return new_collection(Loc(frame, iindexer))

def _loc_array(self, iindexer, cindexer):
iindexer_series = from_legacy_dataframe(
iindexer.to_dask_dataframe("_", self.obj.index.to_legacy_dataframe())
)
iindexer_series = from_dask_array(iindexer, columns="_", index=self.obj.index)
return self._loc_series(iindexer_series, cindexer, check_alignment=False)

def _maybe_partial_time_string(self, iindexer, unit):
Expand Down
17 changes: 11 additions & 6 deletions dask_expr/io/_delayed.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from collections.abc import Iterable
from typing import TYPE_CHECKING

import pandas as pd
from dask._task_spec import Alias, Task, TaskRef
from dask.dataframe.dispatch import make_meta
from dask.dataframe.utils import check_meta
from dask.dataframe.utils import check_meta, pyarrow_strings_enabled
from dask.delayed import Delayed, delayed
from dask.typing import Key

from dask_expr._expr import DelayedsExpr, PartitionsFiltered
from dask_expr._expr import ArrowStringConversion, DelayedsExpr, PartitionsFiltered
from dask_expr._util import _tokenize_deterministic
from dask_expr.io import BlockwiseIO

Expand Down Expand Up @@ -141,8 +142,12 @@ def read_xml(path):

from dask_expr._collection import new_collection

return new_collection(
FromDelayed(
DelayedsExpr(*dfs), make_meta(meta), divisions, verify_meta, None, prefix
)
result = FromDelayed(
DelayedsExpr(*dfs), make_meta(meta), divisions, verify_meta, None, prefix
)
if pyarrow_strings_enabled() and any(
pd.api.types.is_object_dtype(dtype)
for dtype in (result.dtypes.values if result.ndim == 2 else [result.dtypes])
):
return new_collection(ArrowStringConversion(result))
return new_collection(result)
42 changes: 40 additions & 2 deletions dask_expr/io/bag.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,42 @@
from dask.dataframe.io.io import _df_to_bag
from dask.tokenize import tokenize

from dask_expr import FrameBase


def to_bag(df, index=False, format="tuple"):
from dask.dataframe.io import to_bag as _to_bag
"""Create Dask Bag from a Dask DataFrame
Parameters
----------
index : bool, optional
If True, the elements are tuples of ``(index, value)``, otherwise
they're just the ``value``. Default is False.
format : {"tuple", "dict", "frame"}, optional
Whether to return a bag of tuples, dictionaries, or
dataframe-like objects. Default is "tuple". If "frame",
the original partitions of ``df`` will not be transformed
in any way.
Examples
--------
>>> bag = df.to_bag() # doctest: +SKIP
"""
from dask.bag.core import Bag

df = df.optimize()

return _to_bag(df.to_legacy_dataframe(), index=index, format=format)
if not isinstance(df, FrameBase):
raise TypeError("df must be either DataFrame or Series")
name = "to_bag-" + tokenize(df._name, index, format)
if format == "frame":
dsk = df.dask
name = df._name
else:
dsk = {
(name, i): (_df_to_bag, block, index, format)
for (i, block) in enumerate(df.__dask_keys__())
}
dsk.update(df.__dask_graph__())
return Bag(dsk, name, df.npartitions)
2 changes: 1 addition & 1 deletion dask_expr/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ def to_csv(
from dask.dataframe.io.csv import to_csv as _to_csv

return _to_csv(
df.to_legacy_dataframe(),
df.optimize(),
filename,
single_file=single_file,
encoding=encoding,
Expand Down
8 changes: 2 additions & 6 deletions dask_expr/io/hdf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
from dask_expr import from_legacy_dataframe


def read_hdf(
pattern,
key,
Expand All @@ -14,7 +11,7 @@ def read_hdf(
):
from dask.dataframe.io import read_hdf as _read_hdf

df = _read_hdf(
return _read_hdf(
pattern,
key,
start=start,
Expand All @@ -25,7 +22,6 @@ def read_hdf(
lock=lock,
mode=mode,
)
return from_legacy_dataframe(df)


def to_hdf(
Expand Down Expand Up @@ -130,7 +126,7 @@ def to_hdf(
from dask.dataframe.io import to_hdf as _to_hdf

return _to_hdf(
df.to_legacy_dataframe(),
df.optimize(),
path,
key,
mode=mode,
Expand Down
6 changes: 2 additions & 4 deletions dask_expr/io/json.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pandas as pd
from dask.dataframe.utils import insert_meta_param_description

from dask_expr import from_legacy_dataframe
from dask_expr._backends import dataframe_creation_dispatch


Expand Down Expand Up @@ -98,7 +97,7 @@ def read_json(
"""
from dask.dataframe.io.json import read_json

df = read_json(
return read_json(
url_path,
orient=orient,
lines=lines,
Expand All @@ -114,7 +113,6 @@ def read_json(
path_converter=path_converter,
**kwargs,
)
return from_legacy_dataframe(df)


def to_json(
Expand Down Expand Up @@ -172,7 +170,7 @@ def to_json(
from dask.dataframe.io.json import to_json

return to_json(
df.to_legacy_dataframe(),
df,
url_path,
orient=orient,
lines=lines,
Expand Down
Loading

0 comments on commit ef6f27f

Please sign in to comment.