Skip to content

Commit

Permalink
Fix merge_asof for single partition (#1145)
Browse files Browse the repository at this point in the history
  • Loading branch information
phofl authored Oct 10, 2024
1 parent 780fc4e commit 9b7d674
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
22 changes: 21 additions & 1 deletion dask_expr/_merge_asof.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,31 @@ def _lower(self):
.expr
)

if not left.known_divisions or not right.known_divisions:
if (
not left.known_divisions
and left.npartitions > 1
or not right.known_divisions
and right.npartitions > 1
):
raise ValueError("merge_asof input must be sorted!")

left_index, right_index = True, True

if left.npartitions == right.npartitions == 1:
return MapPartitions(
self.left,
pd.merge_asof,
self._meta,
True,
True,
False,
True,
None,
None,
self._kwargs,
self.right,
)

if all(map(pd.isnull, left.divisions)):
return FromPandas(
_BackendData(self._meta),
Expand Down
22 changes: 22 additions & 0 deletions dask_expr/tests/test_merge_asof.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,25 @@ def test_merge_asof_on_basic():
c = merge_asof(a, b, on="a")
# merge_asof does not preserve index
assert_eq(c, C, check_index=False)


def test_merge_asof_one_partition():
left = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
right = pd.DataFrame({"a": [1, 2, 3], "c": [4, 5, 6]})

ddf_left = from_pandas(left, npartitions=1)
ddf_left = ddf_left.set_index("a", sort=True)
ddf_right = from_pandas(right, npartitions=1)
ddf_right = ddf_right.set_index("a", sort=True)

result = merge_asof(
ddf_left, ddf_right, left_index=True, right_index=True, direction="nearest"
)
expected = pd.merge_asof(
left.set_index("a"),
right.set_index("a"),
left_index=True,
right_index=True,
direction="nearest",
)
assert_eq(result, expected)

0 comments on commit 9b7d674

Please sign in to comment.