Skip to content

Commit

Permalink
Migrate Blockwise to use taskspec (#1159)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Nov 8, 2024
1 parent 42b725c commit e7dcf47
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 175 deletions.
16 changes: 13 additions & 3 deletions dask_expr/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import dask
import pandas as pd
import toolz
from dask._task_spec import Task
from dask.dataframe.core import is_dataframe_like, is_index_like, is_series_like
from dask.typing import Key
from dask.utils import funcname, import_required, is_arraylike

from dask_expr._util import _BackendData, _tokenize_deterministic
Expand Down Expand Up @@ -179,7 +181,7 @@ def dependencies(self):
# Dependencies are `Expr` operands only
return [operand for operand in self.operands if isinstance(operand, Expr)]

def _task(self, index: int):
def _task(self, key: Key, index: int) -> Task:
"""The task for the i'th partition
Parameters
Expand All @@ -191,7 +193,12 @@ def _task(self, index: int):
--------
>>> class Add(Expr):
... def _task(self, i):
... return (operator.add, (self.left._name, i), (self.right._name, i))
... return Task(
... name,
... operator.add,
... TaskRef((self.left._name, i)),
... TaskRef((self.right._name, i))
... )
Returns
-------
Expand Down Expand Up @@ -230,7 +237,10 @@ def _layer(self) -> dict:
Expr.__dask_graph__
"""

return {(self._name, i): self._task(i) for i in range(self.npartitions)}
return {
(self._name, i): self._task((self._name, i), i)
for i in range(self.npartitions)
}

def rewrite(self, kind: str):
"""Rewrite an expression
Expand Down
Loading

0 comments on commit e7dcf47

Please sign in to comment.