Skip to content

Commit

Permalink
refactor and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jurajlivesport committed Apr 17, 2024
1 parent c07640e commit b55f0ba
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,11 @@ package org.mobilenativefoundation.store.store5.impl

import co.touchlab.kermit.CommonWriter
import co.touchlab.kermit.Logger
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.mobilenativefoundation.store.store5.Bookkeeper
Expand All @@ -23,7 +17,6 @@ import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.MutableStore
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
import org.mobilenativefoundation.store.store5.StoreWriteRequest
import org.mobilenativefoundation.store.store5.StoreWriteResponse
import org.mobilenativefoundation.store.store5.Updater
Expand All @@ -45,8 +38,6 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
private val keyToWriteRequestQueue = mutableMapOf<Key, WriteRequestQueue<Key, Output, *>>()
private val keyToThreadSafety = mutableMapOf<Key, ThreadSafety>()

private val writeRequestChannel = Channel<Pair<Key, Output>>()

override fun <Response : Any> stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
flow {
safeInitStore(request.key)
Expand All @@ -69,14 +60,7 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
}
}

emitAll(
merge(
delegate.stream(request),
writeRequestChannel.receiveAsFlow()
.filter { it.first == request.key }
.map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) },
)
)
delegate.stream(request).collect { storeReadResponse -> emit(storeReadResponse) }
}

@ExperimentalStoreApi
Expand All @@ -90,9 +74,6 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
.collect { writeRequest ->
val storeWriteResponse = try {
delegate.write(writeRequest.key, writeRequest.value)
if (!delegate.hasSourceOfTruth()) {
writeRequestChannel.trySend(writeRequest.key to writeRequest.value)
}
when (val updaterResult = tryUpdateServer(writeRequest)) {
is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow
Expand Down Expand Up @@ -76,7 +77,8 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
converter = converter
)

private val localOnlyChannel = Channel<Pair<Key, Output>>()
private val writeRequestChannel = Channel<Pair<Key, Output>>()
private val localOnlyRequestChannel = Channel<Pair<Key, Output>>()

@Suppress("UNCHECKED_CAST")
override fun stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
Expand Down Expand Up @@ -105,7 +107,7 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache))
}
emitAll(
localOnlyChannel.receiveAsFlow()
localOnlyRequestChannel.receiveAsFlow()
.filter { it.first == request.key }
.map {
StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache)
Expand All @@ -119,11 +121,18 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
val piggybackOnly = !request.refresh && cachedToEmit != null
@Suppress("UNCHECKED_CAST")

createNetworkFlow(
val networkFlow = createNetworkFlow(
request = request,
networkLock = null,
piggybackOnly = piggybackOnly
) as Flow<StoreReadResponse<Output>> // when no source of truth Input == Output

merge(
networkFlow,
writeRequestChannel.receiveAsFlow()
.filter { writeRequest -> writeRequest.first == request.key }
.map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) }
)
} else if (request.fetch) {
diskNetworkCombined(request, sourceOfTruth)
} else {
Expand Down Expand Up @@ -177,7 +186,7 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
}
}
if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) {
localOnlyChannel.trySend(request.key to it.value)
localOnlyRequestChannel.trySend(request.key to it.value)
}
}

Expand Down Expand Up @@ -339,7 +348,11 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(

internal suspend fun write(key: Key, value: Output): StoreDelegateWriteResult = try {
memCache?.put(key, value)
sourceOfTruth?.write(key, converter.fromOutputToLocal(value))
if (sourceOfTruth != null) {
sourceOfTruth.write(key, converter.fromOutputToLocal(value))
} else {
writeRequestChannel.trySend(key to value)
}
StoreDelegateWriteResult.Success
} catch (error: Throwable) {
StoreDelegateWriteResult.Error.Exception(error)
Expand All @@ -348,8 +361,6 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
internal suspend fun latestOrNull(key: Key): Output? =
fromMemCache(key) ?: fromSourceOfTruth(key)

internal fun hasSourceOfTruth() = sourceOfTruth != null

private suspend fun fromSourceOfTruth(key: Key) =
sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first()

Expand All @@ -362,3 +373,4 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.minutes
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
Expand Down Expand Up @@ -270,59 +271,90 @@ class UpdaterTests {
}

@Test
fun collectResponseAfterWriting() = testScope.runTest {
fun collectResponseAfterWritingWithSourceOfTruth() {
val ttl = inHours(1)

val store = StoreBuilder.from<NotesKey, NetworkNote>(
fetcher = Fetcher.of { key -> api.get(key, ttl = ttl) },
val converter = NotesConverterProvider().provide()
val validator = NotesValidator()
val updater = NotesUpdaterProvider(api).provide()

val store = MutableStoreBuilder.from<NotesKey, NetworkNote, InputNote, OutputNote>(
fetcher = Fetcher.ofFlow { key ->
val network = api.get(key, ttl = ttl)
flow { emit(network) }
},
sourceOfTruth = SourceOfTruth.of(
nonFlowReader = { key -> notes.get(key) },
writer = { key, sot -> notes.put(key, sot) },
delete = { key -> notes.clear(key) },
deleteAll = { notes.clear() }
),
converter
)
.cachePolicy(MemoryPolicy.builder<NotesKey, NetworkNote>().setExpireAfterWrite(10.minutes).build())
.build().asMutableStore<NotesKey, NetworkNote, NetworkNote, NetworkNote, NetworkNote>(
.validator(validator)
.build(
updater = updater,
bookkeeper = null
)

testCollectResponseAfterWriting(store, ttl)
}

@Test
fun collectResponseAfterWritingWithoutSourceOfTruth() {
val ttl = inHours(1)

val store = StoreBuilder.from<NotesKey, OutputNote>(
fetcher = Fetcher.of { key -> OutputNote(api.get(key, ttl = ttl).data, ttl = ttl) },
)
.cachePolicy(MemoryPolicy.builder<NotesKey, OutputNote>().setExpireAfterWrite(10.minutes).build())
.build().asMutableStore<NotesKey, OutputNote, OutputNote, OutputNote, OutputNote>(
Updater.by(
{ _, v -> UpdaterResult.Success.Typed(v) },
),
null,
)

testCollectResponseAfterWriting(store, ttl)
}

private fun testCollectResponseAfterWriting(
store: MutableStore<NotesKey, OutputNote>,
ttl: Long,
) = testScope.runTest {
val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id))

store.stream<NotesWriteResponse>(readRequest).test {
assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem())
assertEquals(
StoreReadResponse.Data(
NetworkNote(NoteData.Single(Notes.One), ttl = ttl),
OutputNote(NoteData.Single(Notes.One), ttl = ttl),
StoreReadResponseOrigin.Fetcher()
),
awaitItem()
)

val newNote = Notes.One.copy(title = "New Title-1")
val writeRequest = StoreWriteRequest.of<NotesKey, NetworkNote, NotesWriteResponse>(
val writeRequest = StoreWriteRequest.of<NotesKey, OutputNote, NotesWriteResponse>(
key = NotesKey.Single(Notes.One.id),
value = NetworkNote(NoteData.Single(newNote), 0)
value = OutputNote(NoteData.Single(newNote), 0)
)

val storeWriteResponse = store.write(writeRequest)

// Write is success
assertEquals(
StoreWriteResponse.Success.Typed(
NetworkNote(NoteData.Single(newNote), 0)
),
storeWriteResponse
)
assertTrue(storeWriteResponse is StoreWriteResponse.Success)

// New data added by 'write' is collected

assertEquals(
NetworkNote(NoteData.Single(newNote), 0),
OutputNote(NoteData.Single(newNote), 0),
awaitItem().requireData()
)

// different key, not collected
store.write(StoreWriteRequest.of<NotesKey, NetworkNote, NotesWriteResponse>(
store.write(StoreWriteRequest.of<NotesKey, OutputNote, NotesWriteResponse>(
key = NotesKey.Single(Notes.Five.id),
value = NetworkNote(NoteData.Single(newNote), 0)
value = OutputNote(NoteData.Single(newNote), 0)
))
}

Expand All @@ -331,7 +363,7 @@ class UpdaterTests {
val cachedStream = store.stream<NotesWriteResponse>(cachedReadRequest)

assertEquals(
NetworkNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0),
OutputNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0),
cachedStream.first().requireData()
)
}
Expand Down

0 comments on commit b55f0ba

Please sign in to comment.