Skip to content

Commit

Permalink
Remove custom read-csv stuff (#1178)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl authored Dec 17, 2024
1 parent 6bf4942 commit 98fef3c
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 194 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dask_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
cache-environment-key: environment-${{ steps.date.outputs.date }}-0

- name: Install current main versions of dask
run: python -m pip install git+https://github.com/dask/dask
run: python -m pip install git+https://github.com/phofl/dask@read-csv-legacy

- name: Install current main versions of distributed
run: python -m pip install git+https://github.com/dask/distributed
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ jobs:
cache-environment-key: environment-${{ steps.date.outputs.date }}-1

- name: Install current main versions of dask
run: python -m pip install git+https://github.com/dask/dask
run: python -m pip install git+https://github.com/phofl/dask@read-csv-legacy
if: ${{ matrix.environment-file == 'ci/environment.yml' }}

- name: Install current main versions of distributed
Expand Down
62 changes: 23 additions & 39 deletions dask_expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5131,23 +5131,17 @@ def read_csv(
path,
*args,
header="infer",
dtype_backend=None,
storage_options=None,
**kwargs,
):
from dask_expr.io.csv import ReadCSV
from dask.dataframe.io.csv import read_csv as _read_csv

if not isinstance(path, str):
path = stringify_path(path)
return new_collection(
ReadCSV(
path,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
dataframe_backend="pandas",
)
return _read_csv(
path,
*args,
header=header,
storage_options=storage_options,
**kwargs,
)


Expand All @@ -5156,23 +5150,18 @@ def read_table(
*args,
header="infer",
usecols=None,
dtype_backend=None,
storage_options=None,
**kwargs,
):
from dask_expr.io.csv import ReadTable
from dask.dataframe.io.csv import read_table as _read_table

if not isinstance(path, str):
path = stringify_path(path)
return new_collection(
ReadTable(
path,
columns=usecols,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
)
return _read_table(
path,
*args,
header=header,
storage_options=storage_options,
usecols=usecols,
**kwargs,
)


Expand All @@ -5181,23 +5170,18 @@ def read_fwf(
*args,
header="infer",
usecols=None,
dtype_backend=None,
storage_options=None,
**kwargs,
):
from dask_expr.io.csv import ReadFwf
from dask.dataframe.io.csv import read_fwf as _read_fwf

if not isinstance(path, str):
path = stringify_path(path)
return new_collection(
ReadFwf(
path,
columns=usecols,
dtype_backend=dtype_backend,
storage_options=storage_options,
kwargs=kwargs,
header=header,
)
return _read_fwf(
path,
*args,
header=header,
storage_options=storage_options,
usecols=usecols,
**kwargs,
)


Expand Down
151 changes: 0 additions & 151 deletions dask_expr/io/csv.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,3 @@
import functools
import operator

from dask._task_spec import Task
from dask.typing import Key

from dask_expr._expr import Projection
from dask_expr._util import _convert_to_list
from dask_expr.io.io import BlockwiseIO, PartitionsFiltered


class ReadCSV(PartitionsFiltered, BlockwiseIO):
_parameters = [
"filename",
"columns",
"header",
"dtype_backend",
"_partitions",
"storage_options",
"kwargs",
"_series",
"dataframe_backend",
]
_defaults = {
"columns": None,
"header": "infer",
"dtype_backend": None,
"kwargs": None,
"_partitions": None,
"storage_options": None,
"_series": False,
"dataframe_backend": "pandas",
}
_absorb_projections = True

@functools.cached_property
def operation(self):
from dask.dataframe.io import read_csv

return read_csv

@functools.cached_property
def _ddf(self):
from dask import config

# Temporary hack to simplify logic
with config.set({"dataframe.backend": self.dataframe_backend}):
kwargs = (
{"dtype_backend": self.dtype_backend}
if self.dtype_backend is not None
else {}
)
if self.kwargs is not None:
kwargs.update(self.kwargs)

columns = _convert_to_list(self.operand("columns"))
if columns is None:
pass
elif "include_path_column" in self.kwargs:
flag = self.kwargs["include_path_column"]
if flag is True:
column_to_remove = "path"
elif isinstance(flag, str):
column_to_remove = flag
else:
column_to_remove = None

columns = [c for c in columns if c != column_to_remove]

if not columns:
meta = self.operation(
self.filename,
header=self.header,
storage_options=self.storage_options,
**kwargs,
)._meta
columns = [list(meta.columns)[0]]

usecols = kwargs.pop("usecols", None)
if usecols is not None and columns is not None:
columns = [col for col in columns if col in usecols]
elif usecols:
columns = usecols

return self.operation(
self.filename,
usecols=columns,
header=self.header,
storage_options=self.storage_options,
**kwargs,
)

@functools.cached_property
def _meta(self):
return self._ddf._meta

def _simplify_up(self, parent, dependents):
if isinstance(parent, Projection):
kwargs = self.kwargs
# int usecols are positional, so block projections
if kwargs.get("usecols", None) is not None and isinstance(
kwargs.get("usecols")[0], int
):
return
return super()._simplify_up(parent, dependents)

@functools.cached_property
def columns(self):
columns_operand = self.operand("columns")
if columns_operand is None:
try:
return list(self._ddf._meta.columns)
except AttributeError:
return []
else:
return _convert_to_list(columns_operand)

def _divisions(self):
return self._ddf.divisions

@functools.cached_property
def _tasks(self):
from dask._task_spec import convert_legacy_graph

return list(convert_legacy_graph(self._ddf.dask.to_dict()).values())

def _filtered_task(self, name: Key, index: int) -> Task:
if self._series:
return Task(name, operator.getitem, self._tasks[index], self.columns[0])
t = self._tasks[index]
if t.key != name:
return Task(name, lambda x: x, t)
return t


class ReadTable(ReadCSV):
@functools.cached_property
def operation(self):
from dask.dataframe.io import read_table

return read_table


class ReadFwf(ReadCSV):
@functools.cached_property
def operation(self):
from dask.dataframe.io import read_fwf

return read_fwf


def to_csv(
df,
filename,
Expand Down
4 changes: 2 additions & 2 deletions dask_expr/io/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
read_parquet,
)
from dask_expr._expr import Expr, Replace
from dask_expr.io import FromArray, FromMap, ReadCSV, ReadParquet, parquet
from dask_expr.io import FromArray, FromMap, ReadParquet, parquet
from dask_expr.tests._util import _backend_library

# Set DataFrame backend for this module
Expand Down Expand Up @@ -257,7 +257,7 @@ def test_to_dask_array(optimize):

@pytest.mark.parametrize(
"fmt,read_func,read_cls",
[("parquet", read_parquet, ReadParquet), ("csv", read_csv, ReadCSV)],
[("parquet", read_parquet, ReadParquet), ("csv", read_csv, FromMap)],
)
def test_combine_similar(tmpdir, fmt, read_func, read_cls):
pdf = pd.DataFrame(
Expand Down

0 comments on commit 98fef3c

Please sign in to comment.