Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit data into collected stream after write or fetch with no SourceOfTruth #634

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions store/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ kotlin {
implementation(kotlin("test"))
implementation(libs.junit)
implementation(libs.kotlinx.coroutines.test)
implementation(libs.turbine)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ import co.touchlab.kermit.CommonWriter
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.transform
import org.mobilenativefoundation.store.cache5.Cache
import org.mobilenativefoundation.store.store5.CacheType
Expand Down Expand Up @@ -73,6 +77,9 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
converter = converter
)

private val writeRequestChannel = Channel<Pair<Key, Output>>()
private val localOnlyRequestChannel = Channel<Pair<Key, Output>>()

@Suppress("UNCHECKED_CAST")
override fun stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
flow {
Expand All @@ -96,7 +103,16 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
if (memCache == null) {
logger.w("Local-only request made with no cache or source of truth configured")
}
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache))
if (cachedToEmit == null) {
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache))
}
emitAll(
localOnlyRequestChannel.receiveAsFlow()
.filter { it.first == request.key }
.map {
StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache)
}
)
return@flow
}

Expand All @@ -105,11 +121,18 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
val piggybackOnly = !request.refresh && cachedToEmit != null
@Suppress("UNCHECKED_CAST")

createNetworkFlow(
val networkFlow = createNetworkFlow(
request = request,
networkLock = null,
piggybackOnly = piggybackOnly
) as Flow<StoreReadResponse<Output>> // when no source of truth Input == Output

merge(
networkFlow,
writeRequestChannel.receiveAsFlow()
.filter { writeRequest -> writeRequest.first == request.key }
.map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) }
)
} else if (request.fetch) {
diskNetworkCombined(request, sourceOfTruth)
} else {
Expand Down Expand Up @@ -162,6 +185,9 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
memCache?.put(request.key, data)
}
}
if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) {
localOnlyRequestChannel.trySend(request.key to it.value)
}
}

override suspend fun clear(key: Key) {
Expand Down Expand Up @@ -322,7 +348,11 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(

internal suspend fun write(key: Key, value: Output): StoreDelegateWriteResult = try {
memCache?.put(key, value)
sourceOfTruth?.write(key, converter.fromOutputToLocal(value))
if (sourceOfTruth != null) {
sourceOfTruth.write(key, converter.fromOutputToLocal(value))
} else {
writeRequestChannel.trySend(key to value)
}
StoreDelegateWriteResult.Success
} catch (error: Throwable) {
StoreDelegateWriteResult.Error.Exception(error)
Expand All @@ -343,3 +373,4 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
}
}
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.mobilenativefoundation.store.store5

import app.cash.turbine.test
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.store5.impl.extensions.get
Expand All @@ -11,6 +11,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration
import org.mobilenativefoundation.store.store5.impl.extensions.fresh

class LocalOnlyTests {
private val testScope = TestScope()
Expand All @@ -25,8 +26,9 @@ class LocalOnlyTests {
.build()
)
.build()
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem())
}
}

@Test
Expand All @@ -48,9 +50,10 @@ class LocalOnlyTests {
val a = store.get(0)
assertEquals("result", a)
assertEquals(1, fetcherHitCounter.value)
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals("result", response.requireData())
assertEquals(1, fetcherHitCounter.value)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals("result", awaitItem().requireData())
assertEquals(1, fetcherHitCounter.value)
}
}

@Test
Expand All @@ -73,9 +76,11 @@ class LocalOnlyTests {
val a = store.get(0)
assertEquals("result", a)
assertEquals(1, fetcherHitCounter.value)
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response)
assertEquals(1, fetcherHitCounter.value)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem())
assertEquals(1, fetcherHitCounter.value)
}

}

@Test
Expand All @@ -88,8 +93,9 @@ class LocalOnlyTests {
)
.disableCache()
.build()
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem())
}
}

@Test
Expand All @@ -109,10 +115,12 @@ class LocalOnlyTests {
val a = store.get(0)
assertEquals("result", a)
assertEquals(1, fetcherHitCounter.value)
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals("result", response.requireData())
assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin)
assertEquals(1, fetcherHitCounter.value)
store.stream(StoreReadRequest.localOnly(0)).test {
val response = awaitItem()
assertEquals("result", response.requireData())
assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin)
assertEquals(1, fetcherHitCounter.value)
}
}

@Test
Expand All @@ -134,9 +142,10 @@ class LocalOnlyTests {
val a = store.get(0)
assertEquals("result", a)
assertEquals(1, fetcherHitCounter.value)
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response)
assertEquals(1, fetcherHitCounter.value)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem())
assertEquals(1, fetcherHitCounter.value)
}
}

@Test
Expand All @@ -145,8 +154,41 @@ class LocalOnlyTests {
.from(Fetcher.of { _: Int -> throw RuntimeException("Fetcher shouldn't be hit") })
.disableCache()
.build()
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertTrue(response is StoreReadResponse.NoNewData)
assertEquals(StoreReadResponseOrigin.Cache, response.origin)
store.stream(StoreReadRequest.localOnly(0)).test {
val response = awaitItem()
assertTrue(response is StoreReadResponse.NoNewData)
assertEquals(StoreReadResponseOrigin.Cache, response.origin)
}
}

@Test
fun collectNewDataFromFetcher() = testScope.runTest {
val fetcherHitCounter = atomic(0)
val store = StoreBuilder
.from(
Fetcher.of { _: Int ->
fetcherHitCounter += 1
"result $fetcherHitCounter"
}
)
.cachePolicy(
MemoryPolicy
.builder<Int, String>()
.build()
)
.build()

store.stream(StoreReadRequest.localOnly(0)).test {
assertTrue(awaitItem() is StoreReadResponse.NoNewData)

assertEquals("result 1", store.fresh(0))
assertEquals("result 1", awaitItem().requireData())

assertEquals("result 2", store.fresh(0))
assertEquals("result 2", awaitItem().requireData())

// different key, not collected
assertEquals("result 3", store.fresh(1))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package org.mobilenativefoundation.store.store5

import kotlinx.coroutines.ExperimentalCoroutinesApi
import app.cash.turbine.test
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.minutes
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.last
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore
import org.mobilenativefoundation.store.store5.impl.extensions.inHours
import org.mobilenativefoundation.store.store5.util.assertEmitsExactly
import org.mobilenativefoundation.store.store5.util.fake.Notes
Expand All @@ -23,13 +31,8 @@ import org.mobilenativefoundation.store.store5.util.model.NetworkNote
import org.mobilenativefoundation.store.store5.util.model.NoteData
import org.mobilenativefoundation.store.store5.util.model.NotesWriteResponse
import org.mobilenativefoundation.store.store5.util.model.OutputNote
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull

@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class)
@OptIn(ExperimentalStoreApi::class)
class UpdaterTests {
private val testScope = TestScope()
private lateinit var api: NotesApi
Expand Down Expand Up @@ -266,4 +269,102 @@ class UpdaterTests {
)
assertEquals(NetworkNote(NoteData.Single(newNote)), api.db[NotesKey.Single(Notes.One.id)])
}

@Test
fun collectResponseAfterWritingWithSourceOfTruth() {
val ttl = inHours(1)

val converter = NotesConverterProvider().provide()
val validator = NotesValidator()
val updater = NotesUpdaterProvider(api).provide()

val store = MutableStoreBuilder.from<NotesKey, NetworkNote, InputNote, OutputNote>(
fetcher = Fetcher.ofFlow { key ->
val network = api.get(key, ttl = ttl)
flow { emit(network) }
},
sourceOfTruth = SourceOfTruth.of(
nonFlowReader = { key -> notes.get(key) },
writer = { key, sot -> notes.put(key, sot) },
delete = { key -> notes.clear(key) },
deleteAll = { notes.clear() }
),
converter
)
.validator(validator)
.build(
updater = updater,
bookkeeper = null
)

testCollectResponseAfterWriting(store, ttl)
}

@Test
fun collectResponseAfterWritingWithoutSourceOfTruth() {
val ttl = inHours(1)

val store = StoreBuilder.from<NotesKey, OutputNote>(
fetcher = Fetcher.of { key -> OutputNote(api.get(key, ttl = ttl).data, ttl = ttl) },
)
.cachePolicy(MemoryPolicy.builder<NotesKey, OutputNote>().setExpireAfterWrite(10.minutes).build())
.build().asMutableStore<NotesKey, OutputNote, OutputNote, OutputNote, OutputNote>(
Updater.by(
{ _, v -> UpdaterResult.Success.Typed(v) },
),
null,
)

testCollectResponseAfterWriting(store, ttl)
}

private fun testCollectResponseAfterWriting(
store: MutableStore<NotesKey, OutputNote>,
ttl: Long,
) = testScope.runTest {
val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id))

store.stream<NotesWriteResponse>(readRequest).test {
assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem())
assertEquals(
StoreReadResponse.Data(
OutputNote(NoteData.Single(Notes.One), ttl = ttl),
StoreReadResponseOrigin.Fetcher()
),
awaitItem()
)

val newNote = Notes.One.copy(title = "New Title-1")
val writeRequest = StoreWriteRequest.of<NotesKey, OutputNote, NotesWriteResponse>(
key = NotesKey.Single(Notes.One.id),
value = OutputNote(NoteData.Single(newNote), 0)
)

val storeWriteResponse = store.write(writeRequest)

assertTrue(storeWriteResponse is StoreWriteResponse.Success)

// New data added by 'write' is collected

assertEquals(
OutputNote(NoteData.Single(newNote), 0),
awaitItem().requireData()
)

// different key, not collected
store.write(StoreWriteRequest.of<NotesKey, OutputNote, NotesWriteResponse>(
key = NotesKey.Single(Notes.Five.id),
value = OutputNote(NoteData.Single(newNote), 0)
))
}

val cachedReadRequest =
StoreReadRequest.cached(NotesKey.Single(Notes.One.id), refresh = false)
val cachedStream = store.stream<NotesWriteResponse>(cachedReadRequest)

assertEquals(
OutputNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0),
cachedStream.first().requireData()
)
}
}