Skip to content

Commit

Permalink
Make sliding windows tests specific for reduce aggregation function
Browse files Browse the repository at this point in the history
  • Loading branch information
gwaramadze committed Jan 1, 2025
1 parent a0ef33b commit b6c8404
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions tests/test_quixstreams/test_dataframe/test_windows/test_sliding.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@

A, B, C, D = "A", "B", "C", "D"

AGGREGATE_PARAMS = {
"reduce": {
"reducer": lambda agg, value: agg + [value],
"initializer": lambda value: [value],
},
}


@dataclass
class Message:
Expand Down Expand Up @@ -666,14 +673,12 @@ def factory(duration_ms: int, grace_ms: int) -> SlidingWindowDefinition:

@pytest.fixture
def window_factory(sliding_window_definition_factory):
def factory(duration_ms: int, grace_ms: int):
def factory(aggregation: str, duration_ms: int, grace_ms: int):
aggregate_params = AGGREGATE_PARAMS[aggregation]
window_definition = sliding_window_definition_factory(
duration_ms=duration_ms, grace_ms=grace_ms
)
window = window_definition.reduce(
reducer=lambda agg, value: agg + [value],
initializer=lambda value: [value],
)
window = getattr(window_definition, aggregation)(**aggregate_params)
window.register_store()
return window

Expand Down Expand Up @@ -742,10 +747,12 @@ def factory(window):
pytest.param(10, 3, EXPIRATION_WITH_GRACE, id="expiration-with-grace"),
],
)
def test_sliding_window(
def test_sliding_window_reduce(
window_factory, state_factory, duration_ms, grace_ms, messages, mock_message_context
):
window = window_factory(duration_ms=duration_ms, grace_ms=grace_ms)
window = window_factory(
aggregation="reduce", duration_ms=duration_ms, grace_ms=grace_ms
)
for message in messages:
with state_factory(window) as state:
updated, expired = window.process_window(
Expand Down

0 comments on commit b6c8404

Please sign in to comment.