Skip to content

Commit

Permalink
feat(zipWithNext, pairwise): add zipWithNext, add `pairwise(transfo…
Browse files Browse the repository at this point in the history
…rm)` (#182)

related issue: Kotlin/kotlinx.coroutines#1767
  • Loading branch information
hoc081098 authored Oct 12, 2023
1 parent 10ceb41 commit c6910c8
Show file tree
Hide file tree
Showing 13 changed files with 339 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
### Added

- Add `Flow.chunked` operator, it is an alias to `Flow.bufferCount` operator.
- Add `Flow.pairwise(transform)` operator - a variant of `Flow.pairwise()` operator,
which allows the transformation of the pair of values via the `transform` lambda parameter.

- Add `Flow.zipWithNext()` operator, it is an alias to `Flow.pairwise()` operator.
- Add `Flow.zipWithNext(transform)` operator, it is an alias to `Flow.pairwise(transform)` operator.

## [0.7.2] - Oct 7, 2023

Expand Down
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
> Kotlinx Coroutines Flow Extensions. Extensions to the Kotlin Flow library. Kotlin Flow extensions.
> Multiplatform Kotlinx Coroutines Flow Extensions. Multiplatform Extensions to the Kotlin Flow
> library. Multiplatform Kotlin Flow extensions. RxJS Kotlin Coroutines Flow. RxSwift Kotlin
> Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava Kotlin
> Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow.
> Coroutines Flow. RxJava Kotlin Coroutines Flow. RxJS Kotlin Flow. RxSwift Kotlin Flow. RxJava
> Kotlin
> Flow. RxJS Coroutines Flow. RxSwift Coroutines Flow. RxJava Coroutines Flow. Kotlin Flow
> operators.
> Coroutines Flow operators.
## Author: [Petrus Nguyễn Thái Học](https://github.com/hoc081098)

Expand Down Expand Up @@ -145,7 +148,7 @@ dependencies {
- [`dematerialize`](#dematerialize)
- [`raceWith`](#racewith--ambwith)
- [`ambWith`](#racewith--ambwith)
- [`pairwise`](#pairwise)
- [`pairwise`](#pairwise--zipWithNext)
- [`repeat`](#repeat)
- [`retryWhenWithDelayStrategy`](#retrywhenwithdelaystrategy)
- [`retryWhenWithExponentialBackoff`](#retrywhenwithexponentialbackoff)
Expand All @@ -157,6 +160,7 @@ dependencies {
- [`takeUntil`](#takeuntil)
- [`throttleTime`](#throttletime)
- [`withLatestFrom`](#withlatestfrom)
- [`zipWithNext`](#pairwise--zipWithNext)

#### bufferCount / chunked

Expand Down Expand Up @@ -871,18 +875,25 @@ raceWith: 3

----

#### pairwise
#### pairwise / zipWithNext

- Similar to [RxJS pairwise](https://rxjs.dev/api/operators/pairwise)

Groups pairs of consecutive emissions together and emits them as a pair.
Emits the `(n)th` and `(n-1)th` events as a pair.
The first value won't be emitted until the second one arrives.
Note, `zipWithNext` is an alias to `pairwise`.

```kotlin
range(0, 4)
.pairwise()
.collect { println("pairwise: $it") }

println("---")

range(0, 4)
.zipWithNext { a, b -> "$a -> $b" }
.collect { println("zipWithNext: $it") }
```

Output:
Expand All @@ -891,6 +902,10 @@ Output:
pairwise: (0, 1)
pairwise: (1, 2)
pairwise: (2, 3)
---
zipWithNext: 0 -> 1
zipWithNext: 1 -> 2
zipWithNext: 2 -> 3
```

----
Expand Down
6 changes: 6 additions & 0 deletions api/FlowExt.api
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public final class com/hoc081098/flowext/DelayStrategy$NoDelayStrategy : com/hoc
public fun nextDelay-3nIYWDw (Ljava/lang/Throwable;J)J
}

public abstract interface annotation class com/hoc081098/flowext/DelicateFlowExtApi : java/lang/annotation/Annotation {
}

public final class com/hoc081098/flowext/EagerKt {
public static final fun flatMapConcatEager (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun flatMapConcatEager$default (Lkotlinx/coroutines/flow/Flow;IILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -183,6 +186,9 @@ public final class com/hoc081098/flowext/NeverFlowKt {

public final class com/hoc081098/flowext/PairwiseKt {
public static final fun pairwise (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun pairwise (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun zipWithNext (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

public final class com/hoc081098/flowext/RaceKt {
Expand Down
8 changes: 8 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ repositories {
kotlin {
explicitApi()

sourceSets {
all {
languageSettings {
optIn("com.hoc081098.flowext.DelicateFlowExtApi")
}
}
}

jvmToolchain {
languageVersion.set(JavaLanguageVersion.of(17))
vendor.set(JvmVendorSpec.AZUL)
Expand Down
41 changes: 41 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/DelicateFlowExtApi.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* MIT License
*
* Copyright (c) 2021-2023 Petrus Nguyễn Thái Học
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.hoc081098.flowext

/**
* Marks declarations in the `FlowExt` that are **delicate** —
* they have limited use-case and shall be used with care in general code.
* Any use of a delicate declaration has to be carefully reviewed to make sure it is
* properly used and does not create problems like memory and resource leaks.
* Carefully read documentation of any declaration marked as `DelicateFlowExtApi`.
*/
@MustBeDocumented
@Retention(value = AnnotationRetention.BINARY)
@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This is a delicate API and its use requires care." +
" Make sure you fully read and understand documentation of the declaration that is marked as a delicate API.",
)
public annotation class DelicateFlowExtApi
66 changes: 60 additions & 6 deletions src/commonMain/kotlin/com/hoc081098/flowext/pairwise.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@

package com.hoc081098.flowext

import com.hoc081098.flowext.utils.NULL_VALUE
import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

// ------------------------------------------- PAIRWISE -------------------------------------------

/**
* Groups pairs of consecutive emissions together and emits them as a pair.
*
* Emits the `(n)th` and `(n-1)th` events as a pair.
* The first value won't be emitted until the second one arrives.
* The resulting [Flow] is empty if this [Flow] emits less than two elements.
*
* This operator is more optimizer than [bufferCount] version:
* ```kotlin
Expand All @@ -46,13 +49,64 @@ import kotlinx.coroutines.flow.flow
* }
* ```
*/
public fun <T> Flow<T>.pairwise(): Flow<Pair<T, T>> = flow {
var last: Any? = null
public fun <T> Flow<T>.pairwise(): Flow<Pair<T, T>> = pairwiseInternal(::Pair)

/**
* Groups pairs of consecutive emissions together and emits the result of applying [transform]
* function to each pair.
*
* The first value won't be emitted until the second one arrives.
* The resulting [Flow] is empty if this [Flow] emits less than two elements.
*
* This operator is more optimizer than [bufferCount] version:
* ```kotlin
* val flow: Flow<T>
*
* val result: Flow<R> = flow
* .bufferCount(bufferSize = 2, startBufferEvery = 1)
* .mapNotNull {
* if (it.size < 2) null
* else transform(it[0], it[1])
* }
* ```
*
* @param transform A function to apply to each pair of consecutive emissions.
*/
public fun <T, R> Flow<T>.pairwise(transform: suspend (a: T, b: T) -> R): Flow<R> =
pairwiseInternal(transform)

// ----------------------------------------- ZIP WITH NEXT -----------------------------------------

/**
* This function is an alias to [pairwise] operator.
*
* Groups pairs of consecutive emissions together and emits them as a pair.
*
* @see pairwise
*/
public fun <T> Flow<T>.zipWithNext(): Flow<Pair<T, T>> = pairwise()

/**
* This function is an alias to [pairwise] operator.
*
* Groups pairs of consecutive emissions together and emits the result of applying [transform]
* function to each pair.
*
* @see pairwise
*/
public fun <T, R> Flow<T>.zipWithNext(transform: suspend (a: T, b: T) -> R): Flow<R> =
pairwise(transform)

// ------------------------------------------- INTERNAL -------------------------------------------

private fun <T, R> Flow<T>.pairwiseInternal(transform: suspend (a: T, b: T) -> R): Flow<R> = flow {
var last: Any? = INTERNAL_NULL_VALUE

collect {
if (last !== null) {
emit(Pair(NULL_VALUE.unbox(last), it))
if (last !== INTERNAL_NULL_VALUE) {
@Suppress("UNCHECKED_CAST")
emit(transform(last as T, it))
}
last = it ?: NULL_VALUE
last = it
}
}
10 changes: 5 additions & 5 deletions src/commonMain/kotlin/com/hoc081098/flowext/selectors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

package com.hoc081098.flowext

import com.hoc081098.flowext.utils.NULL_VALUE
import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
Expand All @@ -36,12 +36,12 @@ private typealias SubStateT = Any?
internal fun <State, Result> Flow<State>.select1Internal(
selector: suspend (State) -> Result,
): Flow<Result> = flow {
var latestState: Any? = NULL_VALUE // Result | NULL_VALUE
var latestState: Any? = INTERNAL_NULL_VALUE // Result | NULL_VALUE

distinctUntilChanged().collect { state ->
val currentState = selector(state)

if (latestState === NULL_VALUE || (latestState as Result) != currentState) {
if (latestState === INTERNAL_NULL_VALUE || (latestState as Result) != currentState) {
latestState = currentState
emit(currentState)
}
Expand All @@ -56,7 +56,7 @@ private fun <State, Result> Flow<State>.selectInternal(

return flow {
var latestSubStates: Array<SubStateT>? = null
var latestState: Any? = NULL_VALUE // Result | NULL_VALUE
var latestState: Any? = INTERNAL_NULL_VALUE // Result | NULL_VALUE
var reusableSubStates: Array<SubStateT>? = null

distinctUntilChanged().collect { state ->
Expand All @@ -74,7 +74,7 @@ private fun <State, Result> Flow<State>.selectInternal(
.also { latestSubStates = it },
)

if (latestState === NULL_VALUE || (latestState as Result) != currentState) {
if (latestState === INTERNAL_NULL_VALUE || (latestState as Result) != currentState) {
latestState = currentState
emit(currentState)
}
Expand Down
8 changes: 4 additions & 4 deletions src/commonMain/kotlin/com/hoc081098/flowext/throttle.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.hoc081098.flowext.ThrottleConfiguration.LEADING
import com.hoc081098.flowext.ThrottleConfiguration.LEADING_AND_TRAILING
import com.hoc081098.flowext.ThrottleConfiguration.TRAILING
import com.hoc081098.flowext.internal.DONE_VALUE
import com.hoc081098.flowext.utils.NULL_VALUE
import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.ExperimentalCoroutinesApi
Expand Down Expand Up @@ -244,7 +244,7 @@ public fun <T> Flow<T>.throttleTime(

// Produce the values using the default (rendezvous) channel
val values = produce {
collect { value -> send(value ?: NULL_VALUE) }
collect { value -> send(value ?: INTERNAL_NULL_VALUE) }
}

var lastValue: Any? = null
Expand All @@ -258,7 +258,7 @@ public fun <T> Flow<T>.throttleTime(
// before we emit, otherwise reentrant code can cause
// issues here.
lastValue = null // Consume the value
return@let downstream.emit(NULL_VALUE.unbox(consumed))
return@let downstream.emit(INTERNAL_NULL_VALUE.unbox(consumed))
}
}

Expand Down Expand Up @@ -290,7 +290,7 @@ public fun <T> Flow<T>.throttleTime(
trySend()
}

when (val duration = durationSelector(NULL_VALUE.unbox(value))) {
when (val duration = durationSelector(INTERNAL_NULL_VALUE.unbox(value))) {
Duration.ZERO -> onWindowClosed()
else -> throttled = scope.launch { delay(duration) }
}
Expand Down
12 changes: 12 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/utils/NULL_VALUE.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,24 @@

package com.hoc081098.flowext.utils

import com.hoc081098.flowext.DelicateFlowExtApi
import kotlin.jvm.JvmField

/**
* This is a work-around for having nested nulls in generic code.
* This allows for writing faster generic code instead of using `Option`.
* This is only used as an optimisation technique in low-level code.
*/
@DelicateFlowExtApi
@JvmField
public val NULL_VALUE: Symbol = Symbol("NULL_VALUE")

/**
* This is a work-around for having nested nulls in generic code.
* This allows for writing faster generic code instead of using `Option`.
* This is only used as an optimisation technique in low-level code.
*
* This is internal and should not be used outside of the library.
*/
@JvmField
internal val INTERNAL_NULL_VALUE: Symbol = Symbol("INTERNAL_NULL_VALUE")
2 changes: 2 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/utils/Symbol.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@

package com.hoc081098.flowext.utils

import com.hoc081098.flowext.DelicateFlowExtApi
import kotlin.jvm.JvmField

/**
* A symbol class that is used to define unique constants that are self-explanatory in debugger.
*/
@DelicateFlowExtApi
public class Symbol(
@JvmField public val symbol: String,
) {
Expand Down
6 changes: 3 additions & 3 deletions src/commonMain/kotlin/com/hoc081098/flowext/withLatestFrom.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
package com.hoc081098.flowext

import com.hoc081098.flowext.internal.AtomicRef
import com.hoc081098.flowext.utils.NULL_VALUE
import com.hoc081098.flowext.utils.INTERNAL_NULL_VALUE
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
Expand All @@ -49,14 +49,14 @@ public fun <A, B, R> Flow<A>.withLatestFrom(
try {
coroutineScope {
launch(start = CoroutineStart.UNDISPATCHED) {
other.collect { otherRef.value = it ?: NULL_VALUE }
other.collect { otherRef.value = it ?: INTERNAL_NULL_VALUE }
}

collect { value ->
emit(
transform(
value,
NULL_VALUE.unbox(otherRef.value ?: return@collect),
INTERNAL_NULL_VALUE.unbox(otherRef.value ?: return@collect),
),
)
}
Expand Down
Loading

0 comments on commit c6910c8

Please sign in to comment.