diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt index 8b50fad60..2abd69f98 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt @@ -4,17 +4,11 @@ package org.mobilenativefoundation.store.store5.impl import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.emitAll -import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.mobilenativefoundation.store.store5.Bookkeeper @@ -23,7 +17,6 @@ import org.mobilenativefoundation.store.core5.ExperimentalStoreApi import org.mobilenativefoundation.store.store5.MutableStore import org.mobilenativefoundation.store.store5.StoreReadRequest import org.mobilenativefoundation.store.store5.StoreReadResponse -import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin import org.mobilenativefoundation.store.store5.StoreWriteRequest import org.mobilenativefoundation.store.store5.StoreWriteResponse import org.mobilenativefoundation.store.store5.Updater @@ -45,8 +38,6 @@ internal class RealMutableStore>() private val keyToThreadSafety = mutableMapOf() - private val writeRequestChannel = Channel>() - override fun stream(request: StoreReadRequest): Flow> = flow { safeInitStore(request.key) @@ -69,14 +60,7 @@ internal class RealMutableStore emit(storeReadResponse) } } @ExperimentalStoreApi @@ -90,9 +74,6 @@ internal class RealMutableStore val storeWriteResponse = try { delegate.write(writeRequest.key, writeRequest.value) - if (!delegate.hasSourceOfTruth()) { - writeRequestChannel.trySend(writeRequest.key to writeRequest.value) - } when (val updaterResult = tryUpdateServer(writeRequest)) { is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error) is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message) diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt index 6c32a3763..a72ad1297 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.receiveAsFlow @@ -76,7 +77,8 @@ internal class RealStore( converter = converter ) - private val localOnlyChannel = Channel>() + private val writeRequestChannel = Channel>() + private val localOnlyRequestChannel = Channel>() @Suppress("UNCHECKED_CAST") override fun stream(request: StoreReadRequest): Flow> = @@ -105,7 +107,7 @@ internal class RealStore( emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache)) } emitAll( - localOnlyChannel.receiveAsFlow() + localOnlyRequestChannel.receiveAsFlow() .filter { it.first == request.key } .map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) @@ -119,11 +121,18 @@ internal class RealStore( val piggybackOnly = !request.refresh && cachedToEmit != null @Suppress("UNCHECKED_CAST") - createNetworkFlow( + val networkFlow = createNetworkFlow( request = request, networkLock = null, piggybackOnly = piggybackOnly ) as Flow> // when no source of truth Input == Output + + merge( + networkFlow, + writeRequestChannel.receiveAsFlow() + .filter { writeRequest -> writeRequest.first == request.key } + .map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) } + ) } else if (request.fetch) { diskNetworkCombined(request, sourceOfTruth) } else { @@ -177,7 +186,7 @@ internal class RealStore( } } if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) { - localOnlyChannel.trySend(request.key to it.value) + localOnlyRequestChannel.trySend(request.key to it.value) } } @@ -339,7 +348,11 @@ internal class RealStore( internal suspend fun write(key: Key, value: Output): StoreDelegateWriteResult = try { memCache?.put(key, value) - sourceOfTruth?.write(key, converter.fromOutputToLocal(value)) + if (sourceOfTruth != null) { + sourceOfTruth.write(key, converter.fromOutputToLocal(value)) + } else { + writeRequestChannel.trySend(key to value) + } StoreDelegateWriteResult.Success } catch (error: Throwable) { StoreDelegateWriteResult.Error.Exception(error) @@ -348,8 +361,6 @@ internal class RealStore( internal suspend fun latestOrNull(key: Key): Output? = fromMemCache(key) ?: fromSourceOfTruth(key) - internal fun hasSourceOfTruth() = sourceOfTruth != null - private suspend fun fromSourceOfTruth(key: Key) = sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first() @@ -362,3 +373,4 @@ internal class RealStore( } } } + diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt index 4fd1efad2..48d0f0fd8 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt @@ -6,6 +6,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertIs import kotlin.test.assertNotNull +import kotlin.test.assertTrue import kotlin.time.Duration.Companion.minutes import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow @@ -270,59 +271,90 @@ class UpdaterTests { } @Test - fun collectResponseAfterWriting() = testScope.runTest { + fun collectResponseAfterWritingWithSourceOfTruth() { val ttl = inHours(1) - val store = StoreBuilder.from( - fetcher = Fetcher.of { key -> api.get(key, ttl = ttl) }, + val converter = NotesConverterProvider().provide() + val validator = NotesValidator() + val updater = NotesUpdaterProvider(api).provide() + + val store = MutableStoreBuilder.from( + fetcher = Fetcher.ofFlow { key -> + val network = api.get(key, ttl = ttl) + flow { emit(network) } + }, + sourceOfTruth = SourceOfTruth.of( + nonFlowReader = { key -> notes.get(key) }, + writer = { key, sot -> notes.put(key, sot) }, + delete = { key -> notes.clear(key) }, + deleteAll = { notes.clear() } + ), + converter ) - .cachePolicy(MemoryPolicy.builder().setExpireAfterWrite(10.minutes).build()) - .build().asMutableStore( + .validator(validator) + .build( + updater = updater, + bookkeeper = null + ) + + testCollectResponseAfterWriting(store, ttl) + } + + @Test + fun collectResponseAfterWritingWithoutSourceOfTruth() { + val ttl = inHours(1) + + val store = StoreBuilder.from( + fetcher = Fetcher.of { key -> OutputNote(api.get(key, ttl = ttl).data, ttl = ttl) }, + ) + .cachePolicy(MemoryPolicy.builder().setExpireAfterWrite(10.minutes).build()) + .build().asMutableStore( Updater.by( { _, v -> UpdaterResult.Success.Typed(v) }, ), null, ) + testCollectResponseAfterWriting(store, ttl) + } + + private fun testCollectResponseAfterWriting( + store: MutableStore, + ttl: Long, + ) = testScope.runTest { val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id)) store.stream(readRequest).test { assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem()) assertEquals( StoreReadResponse.Data( - NetworkNote(NoteData.Single(Notes.One), ttl = ttl), + OutputNote(NoteData.Single(Notes.One), ttl = ttl), StoreReadResponseOrigin.Fetcher() ), awaitItem() ) val newNote = Notes.One.copy(title = "New Title-1") - val writeRequest = StoreWriteRequest.of( + val writeRequest = StoreWriteRequest.of( key = NotesKey.Single(Notes.One.id), - value = NetworkNote(NoteData.Single(newNote), 0) + value = OutputNote(NoteData.Single(newNote), 0) ) val storeWriteResponse = store.write(writeRequest) - // Write is success - assertEquals( - StoreWriteResponse.Success.Typed( - NetworkNote(NoteData.Single(newNote), 0) - ), - storeWriteResponse - ) + assertTrue(storeWriteResponse is StoreWriteResponse.Success) // New data added by 'write' is collected assertEquals( - NetworkNote(NoteData.Single(newNote), 0), + OutputNote(NoteData.Single(newNote), 0), awaitItem().requireData() ) // different key, not collected - store.write(StoreWriteRequest.of( + store.write(StoreWriteRequest.of( key = NotesKey.Single(Notes.Five.id), - value = NetworkNote(NoteData.Single(newNote), 0) + value = OutputNote(NoteData.Single(newNote), 0) )) } @@ -331,7 +363,7 @@ class UpdaterTests { val cachedStream = store.stream(cachedReadRequest) assertEquals( - NetworkNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0), + OutputNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0), cachedStream.first().requireData() ) }