diff --git a/store/build.gradle.kts b/store/build.gradle.kts index 11a3bd4ae..7fc1e2578 100644 --- a/store/build.gradle.kts +++ b/store/build.gradle.kts @@ -65,6 +65,7 @@ kotlin { implementation(kotlin("test")) implementation(libs.junit) implementation(libs.kotlinx.coroutines.test) + implementation(libs.turbine) } } diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt index 2abd69f98..8b50fad60 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt @@ -4,11 +4,17 @@ package org.mobilenativefoundation.store.store5.impl import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.mobilenativefoundation.store.store5.Bookkeeper @@ -17,6 +23,7 @@ import org.mobilenativefoundation.store.core5.ExperimentalStoreApi import org.mobilenativefoundation.store.store5.MutableStore import org.mobilenativefoundation.store.store5.StoreReadRequest import org.mobilenativefoundation.store.store5.StoreReadResponse +import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin import org.mobilenativefoundation.store.store5.StoreWriteRequest import org.mobilenativefoundation.store.store5.StoreWriteResponse import org.mobilenativefoundation.store.store5.Updater @@ -38,6 +45,8 @@ internal class RealMutableStore>() private val keyToThreadSafety = mutableMapOf() + private val writeRequestChannel = Channel>() + override fun stream(request: StoreReadRequest): Flow> = flow { safeInitStore(request.key) @@ -60,7 +69,14 @@ internal class RealMutableStore emit(storeReadResponse) } + emitAll( + merge( + delegate.stream(request), + writeRequestChannel.receiveAsFlow() + .filter { it.first == request.key } + .map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) }, + ) + ) } @ExperimentalStoreApi @@ -74,6 +90,9 @@ internal class RealMutableStore val storeWriteResponse = try { delegate.write(writeRequest.key, writeRequest.value) + if (!delegate.hasSourceOfTruth()) { + writeRequestChannel.trySend(writeRequest.key to writeRequest.value) + } when (val updaterResult = tryUpdateServer(writeRequest)) { is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error) is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message) diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt index c335b9155..6c32a3763 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt @@ -19,13 +19,16 @@ import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.transform import org.mobilenativefoundation.store.cache5.Cache import org.mobilenativefoundation.store.store5.CacheType @@ -73,6 +76,8 @@ internal class RealStore( converter = converter ) + private val localOnlyChannel = Channel>() + @Suppress("UNCHECKED_CAST") override fun stream(request: StoreReadRequest): Flow> = flow { @@ -96,7 +101,16 @@ internal class RealStore( if (memCache == null) { logger.w("Local-only request made with no cache or source of truth configured") } - emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache)) + if (cachedToEmit == null) { + emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache)) + } + emitAll( + localOnlyChannel.receiveAsFlow() + .filter { it.first == request.key } + .map { + StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) + } + ) return@flow } @@ -162,6 +176,9 @@ internal class RealStore( memCache?.put(request.key, data) } } + if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) { + localOnlyChannel.trySend(request.key to it.value) + } } override suspend fun clear(key: Key) { @@ -331,6 +348,8 @@ internal class RealStore( internal suspend fun latestOrNull(key: Key): Output? = fromMemCache(key) ?: fromSourceOfTruth(key) + internal fun hasSourceOfTruth() = sourceOfTruth != null + private suspend fun fromSourceOfTruth(key: Key) = sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first() diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt index ad73b5f17..55b810079 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt @@ -1,7 +1,7 @@ package org.mobilenativefoundation.store.store5 +import app.cash.turbine.test import kotlinx.atomicfu.atomic -import kotlinx.coroutines.flow.first import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import org.mobilenativefoundation.store.store5.impl.extensions.get @@ -11,6 +11,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertTrue import kotlin.time.Duration +import org.mobilenativefoundation.store.store5.impl.extensions.fresh class LocalOnlyTests { private val testScope = TestScope() @@ -25,8 +26,9 @@ class LocalOnlyTests { .build() ) .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem()) + } } @Test @@ -48,9 +50,10 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals("result", response.requireData()) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals("result", awaitItem().requireData()) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -73,9 +76,11 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem()) + assertEquals(1, fetcherHitCounter.value) + } + } @Test @@ -88,8 +93,9 @@ class LocalOnlyTests { ) .disableCache() .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem()) + } } @Test @@ -109,10 +115,12 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals("result", response.requireData()) - assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + val response = awaitItem() + assertEquals("result", response.requireData()) + assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -134,9 +142,10 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem()) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -145,8 +154,41 @@ class LocalOnlyTests { .from(Fetcher.of { _: Int -> throw RuntimeException("Fetcher shouldn't be hit") }) .disableCache() .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertTrue(response is StoreReadResponse.NoNewData) - assertEquals(StoreReadResponseOrigin.Cache, response.origin) + store.stream(StoreReadRequest.localOnly(0)).test { + val response = awaitItem() + assertTrue(response is StoreReadResponse.NoNewData) + assertEquals(StoreReadResponseOrigin.Cache, response.origin) + } + } + + @Test + fun collectNewDataFromFetcher() = testScope.runTest { + val fetcherHitCounter = atomic(0) + val store = StoreBuilder + .from( + Fetcher.of { _: Int -> + fetcherHitCounter += 1 + "result $fetcherHitCounter" + } + ) + .cachePolicy( + MemoryPolicy + .builder() + .build() + ) + .build() + + store.stream(StoreReadRequest.localOnly(0)).test { + assertTrue(awaitItem() is StoreReadResponse.NoNewData) + + assertEquals("result 1", store.fresh(0)) + assertEquals("result 1", awaitItem().requireData()) + + assertEquals("result 2", store.fresh(0)) + assertEquals("result 2", awaitItem().requireData()) + + // different key, not collected + assertEquals("result 3", store.fresh(1)) + } } } diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt index fa5052fb9..4fd1efad2 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt @@ -1,6 +1,12 @@ package org.mobilenativefoundation.store.store5 -import kotlinx.coroutines.ExperimentalCoroutinesApi +import app.cash.turbine.test +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNotNull +import kotlin.time.Duration.Companion.minutes import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.last @@ -8,6 +14,7 @@ import kotlinx.coroutines.flow.take import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import org.mobilenativefoundation.store.core5.ExperimentalStoreApi +import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore import org.mobilenativefoundation.store.store5.impl.extensions.inHours import org.mobilenativefoundation.store.store5.util.assertEmitsExactly import org.mobilenativefoundation.store.store5.util.fake.Notes @@ -23,13 +30,8 @@ import org.mobilenativefoundation.store.store5.util.model.NetworkNote import org.mobilenativefoundation.store.store5.util.model.NoteData import org.mobilenativefoundation.store.store5.util.model.NotesWriteResponse import org.mobilenativefoundation.store.store5.util.model.OutputNote -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertIs -import kotlin.test.assertNotNull -@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class) +@OptIn(ExperimentalStoreApi::class) class UpdaterTests { private val testScope = TestScope() private lateinit var api: NotesApi @@ -266,4 +268,71 @@ class UpdaterTests { ) assertEquals(NetworkNote(NoteData.Single(newNote)), api.db[NotesKey.Single(Notes.One.id)]) } + + @Test + fun collectResponseAfterWriting() = testScope.runTest { + val ttl = inHours(1) + + val store = StoreBuilder.from( + fetcher = Fetcher.of { key -> api.get(key, ttl = ttl) }, + ) + .cachePolicy(MemoryPolicy.builder().setExpireAfterWrite(10.minutes).build()) + .build().asMutableStore( + Updater.by( + { _, v -> UpdaterResult.Success.Typed(v) }, + ), + null, + ) + + val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id)) + + store.stream(readRequest).test { + assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem()) + assertEquals( + StoreReadResponse.Data( + NetworkNote(NoteData.Single(Notes.One), ttl = ttl), + StoreReadResponseOrigin.Fetcher() + ), + awaitItem() + ) + + val newNote = Notes.One.copy(title = "New Title-1") + val writeRequest = StoreWriteRequest.of( + key = NotesKey.Single(Notes.One.id), + value = NetworkNote(NoteData.Single(newNote), 0) + ) + + val storeWriteResponse = store.write(writeRequest) + + // Write is success + assertEquals( + StoreWriteResponse.Success.Typed( + NetworkNote(NoteData.Single(newNote), 0) + ), + storeWriteResponse + ) + + // New data added by 'write' is collected + + assertEquals( + NetworkNote(NoteData.Single(newNote), 0), + awaitItem().requireData() + ) + + // different key, not collected + store.write(StoreWriteRequest.of( + key = NotesKey.Single(Notes.Five.id), + value = NetworkNote(NoteData.Single(newNote), 0) + )) + } + + val cachedReadRequest = + StoreReadRequest.cached(NotesKey.Single(Notes.One.id), refresh = false) + val cachedStream = store.stream(cachedReadRequest) + + assertEquals( + NetworkNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0), + cachedStream.first().requireData() + ) + } }