Skip to content

Commit

Permalink
Added AccumulatorSubject
Browse files Browse the repository at this point in the history
  • Loading branch information
bumblender committed Oct 4, 2023
1 parent e73ab05 commit 2e17260
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 57 deletions.
52 changes: 52 additions & 0 deletions binder/src/main/java/com/badoo/binder/AccumulatorSubject.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.badoo.binder

import io.reactivex.ObservableSource
import io.reactivex.Observer
import io.reactivex.functions.Consumer
import io.reactivex.subjects.PublishSubject
import java.util.concurrent.atomic.AtomicBoolean

interface Drainable {
fun drain()
}

class AccumulatorSubject<T : Any>(
private val initialState: T? = null
) : ObservableSource<T>, Consumer<T>, Drainable {

private val items: MutableList<T> = mutableListOf()
private val events: PublishSubject<T> = PublishSubject.create()

private var drained = AtomicBoolean(false)

init {
initialState?.also { items.add(it) }
}

override fun subscribe(observer: Observer<in T>) {
events.subscribe(observer)
if (!drained.get()) {
items.forEach { observer.onNext(it) }
} else {
initialState?.also { observer.onNext(it) }
}
}

override fun accept(value: T) {
if (!drained.get()) {
items.add(value)
}
events.onNext(value)
}

override fun drain() {
if (!drained.get()) {
drained.set(true)
items.clear()
}
}

companion object {
fun <T : Any> create() = AccumulatorSubject<T>()
}
}
32 changes: 20 additions & 12 deletions binder/src/main/java/com/badoo/binder/Binder.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable
import io.reactivex.functions.Consumer
import io.reactivex.rxkotlin.plusAssign
import io.reactivex.subjects.UnicastSubject

class Binder(
private val lifecycle: Lifecycle? = null,
) : Disposable {
private var drained: Boolean = false
private val bindings = mutableListOf<Binding>()
private val disposables = CompositeDisposable()
private val connectionDisposables = CompositeDisposable()
Expand Down Expand Up @@ -54,24 +54,25 @@ class Binder(
lifecycle != null -> {
with(Binding(connection, middleware)) {
bindings.add(this)
if (drained) {
this.drain()
}
accumulate()
if (isActive) {
accumulate()
subscribeWithLifecycle<Out, In>(this)
subscribeWithLifecycle<In>(this)
}
}
}
else -> subscribe(connection, middleware)
}
}

private fun <Out, In> subscribeWithLifecycle(binding: Binding) {
(binding.source as? UnicastSubject<Out>)?.let { source ->
connectionDisposables += wrap(source)
.subscribeWithMiddleware(
binding.connection as Connection<Out, In>,
binding.middleware as? Middleware<Out, In>
)
}
private fun <In> subscribeWithLifecycle(binding: Binding) {
connectionDisposables += wrap(binding.source)
.subscribeWithMiddleware(
binding.connection as Connection<Any?, In>,
binding.middleware as? Middleware<Any?, In>,
)
}

private fun <Out, In> subscribe(
Expand Down Expand Up @@ -127,7 +128,7 @@ class Binder(
private fun bindConnections() {
isActive = true
bindings.forEach { it.accumulate() }
bindings.forEach { subscribeWithLifecycle<Any, Any>(it) }
bindings.forEach { subscribeWithLifecycle<Any>(it) }
}

private fun unbindConnections() {
Expand Down Expand Up @@ -162,6 +163,13 @@ class Binder(
this
}

fun drain() {
if (!drained) {
bindings.forEach { it.drain() }
drained = true
}
}

class BinderObserveOnScope(
private val binder: Binder,
private val observeScheduler: Scheduler
Expand Down
9 changes: 7 additions & 2 deletions binder/src/main/java/com/badoo/binder/Binding.kt
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.badoo.binder

import com.badoo.binder.middleware.base.Middleware
import io.reactivex.ObservableSource
import io.reactivex.subjects.UnicastSubject

internal class Binding(
val connection: Connection<*, *>,
val middleware: Middleware<*, *>?
) {

var source: UnicastSubject<*>? = null
var source: ObservableSource<*>? = null
private set

fun accumulate() {
Expand All @@ -17,4 +18,8 @@ internal class Binding(
.also { observer -> source.subscribe(observer) }
}
}
}

fun drain() {
(connection.from as? Drainable)?.drain()
}
}
116 changes: 116 additions & 0 deletions binder/src/test/java/com/badoo/binder/AccumulatorSubjectTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.badoo.binder

import com.badoo.binder.lifecycle.Lifecycle
import com.badoo.binder.lifecycle.ManualLifecycle
import io.reactivex.functions.Consumer
import io.reactivex.subjects.PublishSubject
import org.junit.jupiter.api.Test

class AccumulatorSubjectTest {

@Test
fun `GIVEN the producer is an accumulator AND the consumer subscribes after the lifecycle started AND before the drain THEN consumes all the events produced`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.bind(producer to Consumer { consumer.onNext(it) })
binder.drain()

testObserver.onComplete()
testObserver.assertValues(0, 1)
}

@Test
fun `GIVEN the producer is an accumulator AND the consumer subscribes after lifecycle started AND before the drain AND producer produces an event after the drain THEN consumes all the produced events`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.bind(producer to Consumer { consumer.onNext(it) })
binder.drain()
producer.accept(score++)

testObserver.onComplete()
testObserver.assertValues(0, 1, 2)
}

@Test
fun `GIVEN the producer is an accumulator AND the consumer subscribes after the lifecycle started AND after the drain THEN the consumer consumes only the events produced after the drain`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.drain()
binder.bind(producer to Consumer { consumer.onNext(it) })
producer.accept(score++)

testObserver.onComplete()
testObserver.assertValues(2)
}

@Test
fun `GIVEN the producer is an accumulator WHEN the consumer subscribes after the lifecycle started AND before the drain THEN should receive events produced before the drain and published when the lifecycle is active`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.bind(producer to Consumer { consumer.onNext(it) })
binder.drain()
lifecycle.end()
producer.accept(score++)
lifecycle.begin()
producer.accept(score++)

testObserver.onComplete()
testObserver.assertValues(0, 1, 3)
}

@Test
fun `GIVEN the producer is an accumulator WHEN the consumer subscribes after the lifecycle restarted THEN should receive only the events after the restart when the lifecycle is active`() {
var score = 0
val lifecycle: ManualLifecycle = Lifecycle.manual()
val producer = AccumulatorSubject.create<Int>()
val consumer = PublishSubject.create<Int>()
val testObserver = consumer.test()
val binder = Binder(lifecycle)

producer.accept(score++)
producer.accept(score++)
lifecycle.begin()
binder.drain()
lifecycle.end()
binder.bind(producer to Consumer { consumer.onNext(it) })
producer.accept(score++)
lifecycle.begin()
producer.accept(score++)

testObserver.onComplete()
testObserver.assertValues(3)
}

}
Loading

0 comments on commit 2e17260

Please sign in to comment.