diff --git a/store/build.gradle.kts b/store/build.gradle.kts index 11a3bd4ae..7fc1e2578 100644 --- a/store/build.gradle.kts +++ b/store/build.gradle.kts @@ -65,6 +65,7 @@ kotlin { implementation(kotlin("test")) implementation(libs.junit) implementation(libs.kotlinx.coroutines.test) + implementation(libs.turbine) } } diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt index c335b9155..a72ad1297 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt @@ -19,13 +19,17 @@ import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.transform import org.mobilenativefoundation.store.cache5.Cache import org.mobilenativefoundation.store.store5.CacheType @@ -73,6 +77,9 @@ internal class RealStore( converter = converter ) + private val writeRequestChannel = Channel>() + private val localOnlyRequestChannel = Channel>() + @Suppress("UNCHECKED_CAST") override fun stream(request: StoreReadRequest): Flow> = flow { @@ -96,7 +103,16 @@ internal class RealStore( if (memCache == null) { logger.w("Local-only request made with no cache or source of truth configured") } - emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache)) + if (cachedToEmit == null) { + emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache)) + } + emitAll( + localOnlyRequestChannel.receiveAsFlow() + .filter { it.first == request.key } + .map { + StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) + } + ) return@flow } @@ -105,11 +121,18 @@ internal class RealStore( val piggybackOnly = !request.refresh && cachedToEmit != null @Suppress("UNCHECKED_CAST") - createNetworkFlow( + val networkFlow = createNetworkFlow( request = request, networkLock = null, piggybackOnly = piggybackOnly ) as Flow> // when no source of truth Input == Output + + merge( + networkFlow, + writeRequestChannel.receiveAsFlow() + .filter { writeRequest -> writeRequest.first == request.key } + .map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) } + ) } else if (request.fetch) { diskNetworkCombined(request, sourceOfTruth) } else { @@ -162,6 +185,9 @@ internal class RealStore( memCache?.put(request.key, data) } } + if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) { + localOnlyRequestChannel.trySend(request.key to it.value) + } } override suspend fun clear(key: Key) { @@ -322,7 +348,11 @@ internal class RealStore( internal suspend fun write(key: Key, value: Output): StoreDelegateWriteResult = try { memCache?.put(key, value) - sourceOfTruth?.write(key, converter.fromOutputToLocal(value)) + if (sourceOfTruth != null) { + sourceOfTruth.write(key, converter.fromOutputToLocal(value)) + } else { + writeRequestChannel.trySend(key to value) + } StoreDelegateWriteResult.Success } catch (error: Throwable) { StoreDelegateWriteResult.Error.Exception(error) @@ -343,3 +373,4 @@ internal class RealStore( } } } + diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt index ad73b5f17..55b810079 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt @@ -1,7 +1,7 @@ package org.mobilenativefoundation.store.store5 +import app.cash.turbine.test import kotlinx.atomicfu.atomic -import kotlinx.coroutines.flow.first import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import org.mobilenativefoundation.store.store5.impl.extensions.get @@ -11,6 +11,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertTrue import kotlin.time.Duration +import org.mobilenativefoundation.store.store5.impl.extensions.fresh class LocalOnlyTests { private val testScope = TestScope() @@ -25,8 +26,9 @@ class LocalOnlyTests { .build() ) .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem()) + } } @Test @@ -48,9 +50,10 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals("result", response.requireData()) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals("result", awaitItem().requireData()) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -73,9 +76,11 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem()) + assertEquals(1, fetcherHitCounter.value) + } + } @Test @@ -88,8 +93,9 @@ class LocalOnlyTests { ) .disableCache() .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem()) + } } @Test @@ -109,10 +115,12 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals("result", response.requireData()) - assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + val response = awaitItem() + assertEquals("result", response.requireData()) + assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -134,9 +142,10 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem()) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -145,8 +154,41 @@ class LocalOnlyTests { .from(Fetcher.of { _: Int -> throw RuntimeException("Fetcher shouldn't be hit") }) .disableCache() .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertTrue(response is StoreReadResponse.NoNewData) - assertEquals(StoreReadResponseOrigin.Cache, response.origin) + store.stream(StoreReadRequest.localOnly(0)).test { + val response = awaitItem() + assertTrue(response is StoreReadResponse.NoNewData) + assertEquals(StoreReadResponseOrigin.Cache, response.origin) + } + } + + @Test + fun collectNewDataFromFetcher() = testScope.runTest { + val fetcherHitCounter = atomic(0) + val store = StoreBuilder + .from( + Fetcher.of { _: Int -> + fetcherHitCounter += 1 + "result $fetcherHitCounter" + } + ) + .cachePolicy( + MemoryPolicy + .builder() + .build() + ) + .build() + + store.stream(StoreReadRequest.localOnly(0)).test { + assertTrue(awaitItem() is StoreReadResponse.NoNewData) + + assertEquals("result 1", store.fresh(0)) + assertEquals("result 1", awaitItem().requireData()) + + assertEquals("result 2", store.fresh(0)) + assertEquals("result 2", awaitItem().requireData()) + + // different key, not collected + assertEquals("result 3", store.fresh(1)) + } } } diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt index fa5052fb9..48d0f0fd8 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt @@ -1,6 +1,13 @@ package org.mobilenativefoundation.store.store5 -import kotlinx.coroutines.ExperimentalCoroutinesApi +import app.cash.turbine.test +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNotNull +import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.minutes import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.last @@ -8,6 +15,7 @@ import kotlinx.coroutines.flow.take import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import org.mobilenativefoundation.store.core5.ExperimentalStoreApi +import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore import org.mobilenativefoundation.store.store5.impl.extensions.inHours import org.mobilenativefoundation.store.store5.util.assertEmitsExactly import org.mobilenativefoundation.store.store5.util.fake.Notes @@ -23,13 +31,8 @@ import org.mobilenativefoundation.store.store5.util.model.NetworkNote import org.mobilenativefoundation.store.store5.util.model.NoteData import org.mobilenativefoundation.store.store5.util.model.NotesWriteResponse import org.mobilenativefoundation.store.store5.util.model.OutputNote -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertIs -import kotlin.test.assertNotNull -@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class) +@OptIn(ExperimentalStoreApi::class) class UpdaterTests { private val testScope = TestScope() private lateinit var api: NotesApi @@ -266,4 +269,102 @@ class UpdaterTests { ) assertEquals(NetworkNote(NoteData.Single(newNote)), api.db[NotesKey.Single(Notes.One.id)]) } + + @Test + fun collectResponseAfterWritingWithSourceOfTruth() { + val ttl = inHours(1) + + val converter = NotesConverterProvider().provide() + val validator = NotesValidator() + val updater = NotesUpdaterProvider(api).provide() + + val store = MutableStoreBuilder.from( + fetcher = Fetcher.ofFlow { key -> + val network = api.get(key, ttl = ttl) + flow { emit(network) } + }, + sourceOfTruth = SourceOfTruth.of( + nonFlowReader = { key -> notes.get(key) }, + writer = { key, sot -> notes.put(key, sot) }, + delete = { key -> notes.clear(key) }, + deleteAll = { notes.clear() } + ), + converter + ) + .validator(validator) + .build( + updater = updater, + bookkeeper = null + ) + + testCollectResponseAfterWriting(store, ttl) + } + + @Test + fun collectResponseAfterWritingWithoutSourceOfTruth() { + val ttl = inHours(1) + + val store = StoreBuilder.from( + fetcher = Fetcher.of { key -> OutputNote(api.get(key, ttl = ttl).data, ttl = ttl) }, + ) + .cachePolicy(MemoryPolicy.builder().setExpireAfterWrite(10.minutes).build()) + .build().asMutableStore( + Updater.by( + { _, v -> UpdaterResult.Success.Typed(v) }, + ), + null, + ) + + testCollectResponseAfterWriting(store, ttl) + } + + private fun testCollectResponseAfterWriting( + store: MutableStore, + ttl: Long, + ) = testScope.runTest { + val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id)) + + store.stream(readRequest).test { + assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem()) + assertEquals( + StoreReadResponse.Data( + OutputNote(NoteData.Single(Notes.One), ttl = ttl), + StoreReadResponseOrigin.Fetcher() + ), + awaitItem() + ) + + val newNote = Notes.One.copy(title = "New Title-1") + val writeRequest = StoreWriteRequest.of( + key = NotesKey.Single(Notes.One.id), + value = OutputNote(NoteData.Single(newNote), 0) + ) + + val storeWriteResponse = store.write(writeRequest) + + assertTrue(storeWriteResponse is StoreWriteResponse.Success) + + // New data added by 'write' is collected + + assertEquals( + OutputNote(NoteData.Single(newNote), 0), + awaitItem().requireData() + ) + + // different key, not collected + store.write(StoreWriteRequest.of( + key = NotesKey.Single(Notes.Five.id), + value = OutputNote(NoteData.Single(newNote), 0) + )) + } + + val cachedReadRequest = + StoreReadRequest.cached(NotesKey.Single(Notes.One.id), refresh = false) + val cachedStream = store.stream(cachedReadRequest) + + assertEquals( + OutputNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0), + cachedStream.first().requireData() + ) + } }