From 62a223ce0629d18be193270ddce1bee6aa4201c2 Mon Sep 17 00:00:00 2001 From: Juraj Filan Date: Tue, 16 Apr 2024 16:42:27 +0200 Subject: [PATCH 1/2] emit data into collected flow Signed-off-by: Juraj Filan --- store/build.gradle.kts | 1 + .../store/store5/impl/RealMutableStore.kt | 21 ++++- .../store/store5/impl/RealStore.kt | 21 ++++- .../store/store5/LocalOnlyTests.kt | 84 ++++++++++++++----- .../store/store5/UpdaterTests.kt | 83 ++++++++++++++++-- 5 files changed, 180 insertions(+), 30 deletions(-) diff --git a/store/build.gradle.kts b/store/build.gradle.kts index 11a3bd4ae..7fc1e2578 100644 --- a/store/build.gradle.kts +++ b/store/build.gradle.kts @@ -65,6 +65,7 @@ kotlin { implementation(kotlin("test")) implementation(libs.junit) implementation(libs.kotlinx.coroutines.test) + implementation(libs.turbine) } } diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt index 2abd69f98..8b50fad60 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt @@ -4,11 +4,17 @@ package org.mobilenativefoundation.store.store5.impl import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.mobilenativefoundation.store.store5.Bookkeeper @@ -17,6 +23,7 @@ import org.mobilenativefoundation.store.core5.ExperimentalStoreApi import org.mobilenativefoundation.store.store5.MutableStore import org.mobilenativefoundation.store.store5.StoreReadRequest import org.mobilenativefoundation.store.store5.StoreReadResponse +import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin import org.mobilenativefoundation.store.store5.StoreWriteRequest import org.mobilenativefoundation.store.store5.StoreWriteResponse import org.mobilenativefoundation.store.store5.Updater @@ -38,6 +45,8 @@ internal class RealMutableStore>() private val keyToThreadSafety = mutableMapOf() + private val writeRequestChannel = Channel>() + override fun stream(request: StoreReadRequest): Flow> = flow { safeInitStore(request.key) @@ -60,7 +69,14 @@ internal class RealMutableStore emit(storeReadResponse) } + emitAll( + merge( + delegate.stream(request), + writeRequestChannel.receiveAsFlow() + .filter { it.first == request.key } + .map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) }, + ) + ) } @ExperimentalStoreApi @@ -74,6 +90,9 @@ internal class RealMutableStore val storeWriteResponse = try { delegate.write(writeRequest.key, writeRequest.value) + if (!delegate.hasSourceOfTruth()) { + writeRequestChannel.trySend(writeRequest.key to writeRequest.value) + } when (val updaterResult = tryUpdateServer(writeRequest)) { is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error) is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message) diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt index c335b9155..6c32a3763 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt @@ -19,13 +19,16 @@ import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.emitAll +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.transform import org.mobilenativefoundation.store.cache5.Cache import org.mobilenativefoundation.store.store5.CacheType @@ -73,6 +76,8 @@ internal class RealStore( converter = converter ) + private val localOnlyChannel = Channel>() + @Suppress("UNCHECKED_CAST") override fun stream(request: StoreReadRequest): Flow> = flow { @@ -96,7 +101,16 @@ internal class RealStore( if (memCache == null) { logger.w("Local-only request made with no cache or source of truth configured") } - emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache)) + if (cachedToEmit == null) { + emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache)) + } + emitAll( + localOnlyChannel.receiveAsFlow() + .filter { it.first == request.key } + .map { + StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) + } + ) return@flow } @@ -162,6 +176,9 @@ internal class RealStore( memCache?.put(request.key, data) } } + if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) { + localOnlyChannel.trySend(request.key to it.value) + } } override suspend fun clear(key: Key) { @@ -331,6 +348,8 @@ internal class RealStore( internal suspend fun latestOrNull(key: Key): Output? = fromMemCache(key) ?: fromSourceOfTruth(key) + internal fun hasSourceOfTruth() = sourceOfTruth != null + private suspend fun fromSourceOfTruth(key: Key) = sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first() diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt index ad73b5f17..55b810079 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/LocalOnlyTests.kt @@ -1,7 +1,7 @@ package org.mobilenativefoundation.store.store5 +import app.cash.turbine.test import kotlinx.atomicfu.atomic -import kotlinx.coroutines.flow.first import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import org.mobilenativefoundation.store.store5.impl.extensions.get @@ -11,6 +11,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertTrue import kotlin.time.Duration +import org.mobilenativefoundation.store.store5.impl.extensions.fresh class LocalOnlyTests { private val testScope = TestScope() @@ -25,8 +26,9 @@ class LocalOnlyTests { .build() ) .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem()) + } } @Test @@ -48,9 +50,10 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals("result", response.requireData()) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals("result", awaitItem().requireData()) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -73,9 +76,11 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem()) + assertEquals(1, fetcherHitCounter.value) + } + } @Test @@ -88,8 +93,9 @@ class LocalOnlyTests { ) .disableCache() .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem()) + } } @Test @@ -109,10 +115,12 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals("result", response.requireData()) - assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + val response = awaitItem() + assertEquals("result", response.requireData()) + assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -134,9 +142,10 @@ class LocalOnlyTests { val a = store.get(0) assertEquals("result", a) assertEquals(1, fetcherHitCounter.value) - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response) - assertEquals(1, fetcherHitCounter.value) + store.stream(StoreReadRequest.localOnly(0)).test { + assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem()) + assertEquals(1, fetcherHitCounter.value) + } } @Test @@ -145,8 +154,41 @@ class LocalOnlyTests { .from(Fetcher.of { _: Int -> throw RuntimeException("Fetcher shouldn't be hit") }) .disableCache() .build() - val response = store.stream(StoreReadRequest.localOnly(0)).first() - assertTrue(response is StoreReadResponse.NoNewData) - assertEquals(StoreReadResponseOrigin.Cache, response.origin) + store.stream(StoreReadRequest.localOnly(0)).test { + val response = awaitItem() + assertTrue(response is StoreReadResponse.NoNewData) + assertEquals(StoreReadResponseOrigin.Cache, response.origin) + } + } + + @Test + fun collectNewDataFromFetcher() = testScope.runTest { + val fetcherHitCounter = atomic(0) + val store = StoreBuilder + .from( + Fetcher.of { _: Int -> + fetcherHitCounter += 1 + "result $fetcherHitCounter" + } + ) + .cachePolicy( + MemoryPolicy + .builder() + .build() + ) + .build() + + store.stream(StoreReadRequest.localOnly(0)).test { + assertTrue(awaitItem() is StoreReadResponse.NoNewData) + + assertEquals("result 1", store.fresh(0)) + assertEquals("result 1", awaitItem().requireData()) + + assertEquals("result 2", store.fresh(0)) + assertEquals("result 2", awaitItem().requireData()) + + // different key, not collected + assertEquals("result 3", store.fresh(1)) + } } } diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt index fa5052fb9..4fd1efad2 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt @@ -1,6 +1,12 @@ package org.mobilenativefoundation.store.store5 -import kotlinx.coroutines.ExperimentalCoroutinesApi +import app.cash.turbine.test +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNotNull +import kotlin.time.Duration.Companion.minutes import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.last @@ -8,6 +14,7 @@ import kotlinx.coroutines.flow.take import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import org.mobilenativefoundation.store.core5.ExperimentalStoreApi +import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore import org.mobilenativefoundation.store.store5.impl.extensions.inHours import org.mobilenativefoundation.store.store5.util.assertEmitsExactly import org.mobilenativefoundation.store.store5.util.fake.Notes @@ -23,13 +30,8 @@ import org.mobilenativefoundation.store.store5.util.model.NetworkNote import org.mobilenativefoundation.store.store5.util.model.NoteData import org.mobilenativefoundation.store.store5.util.model.NotesWriteResponse import org.mobilenativefoundation.store.store5.util.model.OutputNote -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertIs -import kotlin.test.assertNotNull -@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class) +@OptIn(ExperimentalStoreApi::class) class UpdaterTests { private val testScope = TestScope() private lateinit var api: NotesApi @@ -266,4 +268,71 @@ class UpdaterTests { ) assertEquals(NetworkNote(NoteData.Single(newNote)), api.db[NotesKey.Single(Notes.One.id)]) } + + @Test + fun collectResponseAfterWriting() = testScope.runTest { + val ttl = inHours(1) + + val store = StoreBuilder.from( + fetcher = Fetcher.of { key -> api.get(key, ttl = ttl) }, + ) + .cachePolicy(MemoryPolicy.builder().setExpireAfterWrite(10.minutes).build()) + .build().asMutableStore( + Updater.by( + { _, v -> UpdaterResult.Success.Typed(v) }, + ), + null, + ) + + val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id)) + + store.stream(readRequest).test { + assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem()) + assertEquals( + StoreReadResponse.Data( + NetworkNote(NoteData.Single(Notes.One), ttl = ttl), + StoreReadResponseOrigin.Fetcher() + ), + awaitItem() + ) + + val newNote = Notes.One.copy(title = "New Title-1") + val writeRequest = StoreWriteRequest.of( + key = NotesKey.Single(Notes.One.id), + value = NetworkNote(NoteData.Single(newNote), 0) + ) + + val storeWriteResponse = store.write(writeRequest) + + // Write is success + assertEquals( + StoreWriteResponse.Success.Typed( + NetworkNote(NoteData.Single(newNote), 0) + ), + storeWriteResponse + ) + + // New data added by 'write' is collected + + assertEquals( + NetworkNote(NoteData.Single(newNote), 0), + awaitItem().requireData() + ) + + // different key, not collected + store.write(StoreWriteRequest.of( + key = NotesKey.Single(Notes.Five.id), + value = NetworkNote(NoteData.Single(newNote), 0) + )) + } + + val cachedReadRequest = + StoreReadRequest.cached(NotesKey.Single(Notes.One.id), refresh = false) + val cachedStream = store.stream(cachedReadRequest) + + assertEquals( + NetworkNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0), + cachedStream.first().requireData() + ) + } } From 0518b22a3a2ed09c9f14a1c9b00663f82873a70d Mon Sep 17 00:00:00 2001 From: Juraj Filan Date: Wed, 17 Apr 2024 11:06:23 +0200 Subject: [PATCH 2/2] refactor and add tests Signed-off-by: Juraj Filan --- .../store/store5/impl/RealMutableStore.kt | 21 +----- .../store/store5/impl/RealStore.kt | 26 +++++-- .../store/store5/UpdaterTests.kt | 70 ++++++++++++++----- 3 files changed, 71 insertions(+), 46 deletions(-) diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt index 8b50fad60..2abd69f98 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt @@ -4,17 +4,11 @@ package org.mobilenativefoundation.store.store5.impl import co.touchlab.kermit.CommonWriter import co.touchlab.kermit.Logger -import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.emitAll -import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.mobilenativefoundation.store.store5.Bookkeeper @@ -23,7 +17,6 @@ import org.mobilenativefoundation.store.core5.ExperimentalStoreApi import org.mobilenativefoundation.store.store5.MutableStore import org.mobilenativefoundation.store.store5.StoreReadRequest import org.mobilenativefoundation.store.store5.StoreReadResponse -import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin import org.mobilenativefoundation.store.store5.StoreWriteRequest import org.mobilenativefoundation.store.store5.StoreWriteResponse import org.mobilenativefoundation.store.store5.Updater @@ -45,8 +38,6 @@ internal class RealMutableStore>() private val keyToThreadSafety = mutableMapOf() - private val writeRequestChannel = Channel>() - override fun stream(request: StoreReadRequest): Flow> = flow { safeInitStore(request.key) @@ -69,14 +60,7 @@ internal class RealMutableStore emit(storeReadResponse) } } @ExperimentalStoreApi @@ -90,9 +74,6 @@ internal class RealMutableStore val storeWriteResponse = try { delegate.write(writeRequest.key, writeRequest.value) - if (!delegate.hasSourceOfTruth()) { - writeRequestChannel.trySend(writeRequest.key to writeRequest.value) - } when (val updaterResult = tryUpdateServer(writeRequest)) { is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error) is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message) diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt index 6c32a3763..a72ad1297 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealStore.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.receiveAsFlow @@ -76,7 +77,8 @@ internal class RealStore( converter = converter ) - private val localOnlyChannel = Channel>() + private val writeRequestChannel = Channel>() + private val localOnlyRequestChannel = Channel>() @Suppress("UNCHECKED_CAST") override fun stream(request: StoreReadRequest): Flow> = @@ -105,7 +107,7 @@ internal class RealStore( emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache)) } emitAll( - localOnlyChannel.receiveAsFlow() + localOnlyRequestChannel.receiveAsFlow() .filter { it.first == request.key } .map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) @@ -119,11 +121,18 @@ internal class RealStore( val piggybackOnly = !request.refresh && cachedToEmit != null @Suppress("UNCHECKED_CAST") - createNetworkFlow( + val networkFlow = createNetworkFlow( request = request, networkLock = null, piggybackOnly = piggybackOnly ) as Flow> // when no source of truth Input == Output + + merge( + networkFlow, + writeRequestChannel.receiveAsFlow() + .filter { writeRequest -> writeRequest.first == request.key } + .map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) } + ) } else if (request.fetch) { diskNetworkCombined(request, sourceOfTruth) } else { @@ -177,7 +186,7 @@ internal class RealStore( } } if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) { - localOnlyChannel.trySend(request.key to it.value) + localOnlyRequestChannel.trySend(request.key to it.value) } } @@ -339,7 +348,11 @@ internal class RealStore( internal suspend fun write(key: Key, value: Output): StoreDelegateWriteResult = try { memCache?.put(key, value) - sourceOfTruth?.write(key, converter.fromOutputToLocal(value)) + if (sourceOfTruth != null) { + sourceOfTruth.write(key, converter.fromOutputToLocal(value)) + } else { + writeRequestChannel.trySend(key to value) + } StoreDelegateWriteResult.Success } catch (error: Throwable) { StoreDelegateWriteResult.Error.Exception(error) @@ -348,8 +361,6 @@ internal class RealStore( internal suspend fun latestOrNull(key: Key): Output? = fromMemCache(key) ?: fromSourceOfTruth(key) - internal fun hasSourceOfTruth() = sourceOfTruth != null - private suspend fun fromSourceOfTruth(key: Key) = sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first() @@ -362,3 +373,4 @@ internal class RealStore( } } } + diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt index 4fd1efad2..48d0f0fd8 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/UpdaterTests.kt @@ -6,6 +6,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertIs import kotlin.test.assertNotNull +import kotlin.test.assertTrue import kotlin.time.Duration.Companion.minutes import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow @@ -270,59 +271,90 @@ class UpdaterTests { } @Test - fun collectResponseAfterWriting() = testScope.runTest { + fun collectResponseAfterWritingWithSourceOfTruth() { val ttl = inHours(1) - val store = StoreBuilder.from( - fetcher = Fetcher.of { key -> api.get(key, ttl = ttl) }, + val converter = NotesConverterProvider().provide() + val validator = NotesValidator() + val updater = NotesUpdaterProvider(api).provide() + + val store = MutableStoreBuilder.from( + fetcher = Fetcher.ofFlow { key -> + val network = api.get(key, ttl = ttl) + flow { emit(network) } + }, + sourceOfTruth = SourceOfTruth.of( + nonFlowReader = { key -> notes.get(key) }, + writer = { key, sot -> notes.put(key, sot) }, + delete = { key -> notes.clear(key) }, + deleteAll = { notes.clear() } + ), + converter ) - .cachePolicy(MemoryPolicy.builder().setExpireAfterWrite(10.minutes).build()) - .build().asMutableStore( + .validator(validator) + .build( + updater = updater, + bookkeeper = null + ) + + testCollectResponseAfterWriting(store, ttl) + } + + @Test + fun collectResponseAfterWritingWithoutSourceOfTruth() { + val ttl = inHours(1) + + val store = StoreBuilder.from( + fetcher = Fetcher.of { key -> OutputNote(api.get(key, ttl = ttl).data, ttl = ttl) }, + ) + .cachePolicy(MemoryPolicy.builder().setExpireAfterWrite(10.minutes).build()) + .build().asMutableStore( Updater.by( { _, v -> UpdaterResult.Success.Typed(v) }, ), null, ) + testCollectResponseAfterWriting(store, ttl) + } + + private fun testCollectResponseAfterWriting( + store: MutableStore, + ttl: Long, + ) = testScope.runTest { val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id)) store.stream(readRequest).test { assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem()) assertEquals( StoreReadResponse.Data( - NetworkNote(NoteData.Single(Notes.One), ttl = ttl), + OutputNote(NoteData.Single(Notes.One), ttl = ttl), StoreReadResponseOrigin.Fetcher() ), awaitItem() ) val newNote = Notes.One.copy(title = "New Title-1") - val writeRequest = StoreWriteRequest.of( + val writeRequest = StoreWriteRequest.of( key = NotesKey.Single(Notes.One.id), - value = NetworkNote(NoteData.Single(newNote), 0) + value = OutputNote(NoteData.Single(newNote), 0) ) val storeWriteResponse = store.write(writeRequest) - // Write is success - assertEquals( - StoreWriteResponse.Success.Typed( - NetworkNote(NoteData.Single(newNote), 0) - ), - storeWriteResponse - ) + assertTrue(storeWriteResponse is StoreWriteResponse.Success) // New data added by 'write' is collected assertEquals( - NetworkNote(NoteData.Single(newNote), 0), + OutputNote(NoteData.Single(newNote), 0), awaitItem().requireData() ) // different key, not collected - store.write(StoreWriteRequest.of( + store.write(StoreWriteRequest.of( key = NotesKey.Single(Notes.Five.id), - value = NetworkNote(NoteData.Single(newNote), 0) + value = OutputNote(NoteData.Single(newNote), 0) )) } @@ -331,7 +363,7 @@ class UpdaterTests { val cachedStream = store.stream(cachedReadRequest) assertEquals( - NetworkNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0), + OutputNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0), cachedStream.first().requireData() ) }