Skip to content

Commit

Permalink
emit data into collected flow
Browse files Browse the repository at this point in the history
  • Loading branch information
jurajlivesport committed Apr 16, 2024
1 parent 63928aa commit c07640e
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 30 deletions.
1 change: 1 addition & 0 deletions store/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ kotlin {
implementation(kotlin("test"))
implementation(libs.junit)
implementation(libs.kotlinx.coroutines.test)
implementation(libs.turbine)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ package org.mobilenativefoundation.store.store5.impl

import co.touchlab.kermit.CommonWriter
import co.touchlab.kermit.Logger
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.mobilenativefoundation.store.store5.Bookkeeper
Expand All @@ -17,6 +23,7 @@ import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.MutableStore
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse
import org.mobilenativefoundation.store.store5.StoreReadResponseOrigin
import org.mobilenativefoundation.store.store5.StoreWriteRequest
import org.mobilenativefoundation.store.store5.StoreWriteResponse
import org.mobilenativefoundation.store.store5.Updater
Expand All @@ -38,6 +45,8 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
private val keyToWriteRequestQueue = mutableMapOf<Key, WriteRequestQueue<Key, Output, *>>()
private val keyToThreadSafety = mutableMapOf<Key, ThreadSafety>()

private val writeRequestChannel = Channel<Pair<Key, Output>>()

override fun <Response : Any> stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
flow {
safeInitStore(request.key)
Expand All @@ -60,7 +69,14 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
}
}

delegate.stream(request).collect { storeReadResponse -> emit(storeReadResponse) }
emitAll(
merge(
delegate.stream(request),
writeRequestChannel.receiveAsFlow()
.filter { it.first == request.key }
.map { StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache) },
)
)
}

@ExperimentalStoreApi
Expand All @@ -74,6 +90,9 @@ internal class RealMutableStore<Key : Any, Network : Any, Output : Any, Local :
.collect { writeRequest ->
val storeWriteResponse = try {
delegate.write(writeRequest.key, writeRequest.value)
if (!delegate.hasSourceOfTruth()) {
writeRequestChannel.trySend(writeRequest.key to writeRequest.value)
}
when (val updaterResult = tryUpdateServer(writeRequest)) {
is UpdaterResult.Error.Exception -> StoreWriteResponse.Error.Exception(updaterResult.error)
is UpdaterResult.Error.Message -> StoreWriteResponse.Error.Message(updaterResult.message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ import co.touchlab.kermit.CommonWriter
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.flow.transform
import org.mobilenativefoundation.store.cache5.Cache
import org.mobilenativefoundation.store.store5.CacheType
Expand Down Expand Up @@ -73,6 +76,8 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
converter = converter
)

private val localOnlyChannel = Channel<Pair<Key, Output>>()

@Suppress("UNCHECKED_CAST")
override fun stream(request: StoreReadRequest<Key>): Flow<StoreReadResponse<Output>> =
flow {
Expand All @@ -96,7 +101,16 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
if (memCache == null) {
logger.w("Local-only request made with no cache or source of truth configured")
}
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache))
if (cachedToEmit == null) {
emit(StoreReadResponse.NoNewData(origin = StoreReadResponseOrigin.Cache))
}
emitAll(
localOnlyChannel.receiveAsFlow()
.filter { it.first == request.key }
.map {
StoreReadResponse.Data(value = it.second, origin = StoreReadResponseOrigin.Cache)
}
)
return@flow
}

Expand Down Expand Up @@ -162,6 +176,9 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
memCache?.put(request.key, data)
}
}
if (sourceOfTruth == null && request.fetch && it is StoreReadResponse.Data) {
localOnlyChannel.trySend(request.key to it.value)
}
}

override suspend fun clear(key: Key) {
Expand Down Expand Up @@ -331,6 +348,8 @@ internal class RealStore<Key : Any, Network : Any, Output : Any, Local : Any>(
internal suspend fun latestOrNull(key: Key): Output? =
fromMemCache(key) ?: fromSourceOfTruth(key)

internal fun hasSourceOfTruth() = sourceOfTruth != null

private suspend fun fromSourceOfTruth(key: Key) =
sourceOfTruth?.reader(key, CompletableDeferred(Unit))?.map { it.dataOrNull() }?.first()

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.mobilenativefoundation.store.store5

import app.cash.turbine.test
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.store5.impl.extensions.get
Expand All @@ -11,6 +11,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
import kotlin.time.Duration
import org.mobilenativefoundation.store.store5.impl.extensions.fresh

class LocalOnlyTests {
private val testScope = TestScope()
Expand All @@ -25,8 +26,9 @@ class LocalOnlyTests {
.build()
)
.build()
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem())
}
}

@Test
Expand All @@ -48,9 +50,10 @@ class LocalOnlyTests {
val a = store.get(0)
assertEquals("result", a)
assertEquals(1, fetcherHitCounter.value)
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals("result", response.requireData())
assertEquals(1, fetcherHitCounter.value)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals("result", awaitItem().requireData())
assertEquals(1, fetcherHitCounter.value)
}
}

@Test
Expand All @@ -73,9 +76,11 @@ class LocalOnlyTests {
val a = store.get(0)
assertEquals("result", a)
assertEquals(1, fetcherHitCounter.value)
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), response)
assertEquals(1, fetcherHitCounter.value)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.Cache), awaitItem())
assertEquals(1, fetcherHitCounter.value)
}

}

@Test
Expand All @@ -88,8 +93,9 @@ class LocalOnlyTests {
)
.disableCache()
.build()
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem())
}
}

@Test
Expand All @@ -109,10 +115,12 @@ class LocalOnlyTests {
val a = store.get(0)
assertEquals("result", a)
assertEquals(1, fetcherHitCounter.value)
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals("result", response.requireData())
assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin)
assertEquals(1, fetcherHitCounter.value)
store.stream(StoreReadRequest.localOnly(0)).test {
val response = awaitItem()
assertEquals("result", response.requireData())
assertEquals(StoreReadResponseOrigin.SourceOfTruth, response.origin)
assertEquals(1, fetcherHitCounter.value)
}
}

@Test
Expand All @@ -134,9 +142,10 @@ class LocalOnlyTests {
val a = store.get(0)
assertEquals("result", a)
assertEquals(1, fetcherHitCounter.value)
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), response)
assertEquals(1, fetcherHitCounter.value)
store.stream(StoreReadRequest.localOnly(0)).test {
assertEquals(StoreReadResponse.NoNewData(StoreReadResponseOrigin.SourceOfTruth), awaitItem())
assertEquals(1, fetcherHitCounter.value)
}
}

@Test
Expand All @@ -145,8 +154,41 @@ class LocalOnlyTests {
.from(Fetcher.of { _: Int -> throw RuntimeException("Fetcher shouldn't be hit") })
.disableCache()
.build()
val response = store.stream(StoreReadRequest.localOnly(0)).first()
assertTrue(response is StoreReadResponse.NoNewData)
assertEquals(StoreReadResponseOrigin.Cache, response.origin)
store.stream(StoreReadRequest.localOnly(0)).test {
val response = awaitItem()
assertTrue(response is StoreReadResponse.NoNewData)
assertEquals(StoreReadResponseOrigin.Cache, response.origin)
}
}

@Test
fun collectNewDataFromFetcher() = testScope.runTest {
val fetcherHitCounter = atomic(0)
val store = StoreBuilder
.from(
Fetcher.of { _: Int ->
fetcherHitCounter += 1
"result $fetcherHitCounter"
}
)
.cachePolicy(
MemoryPolicy
.builder<Int, String>()
.build()
)
.build()

store.stream(StoreReadRequest.localOnly(0)).test {
assertTrue(awaitItem() is StoreReadResponse.NoNewData)

assertEquals("result 1", store.fresh(0))
assertEquals("result 1", awaitItem().requireData())

assertEquals("result 2", store.fresh(0))
assertEquals("result 2", awaitItem().requireData())

// different key, not collected
assertEquals("result 3", store.fresh(1))
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package org.mobilenativefoundation.store.store5

import kotlinx.coroutines.ExperimentalCoroutinesApi
import app.cash.turbine.test
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull
import kotlin.time.Duration.Companion.minutes
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.last
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.store5.impl.extensions.asMutableStore
import org.mobilenativefoundation.store.store5.impl.extensions.inHours
import org.mobilenativefoundation.store.store5.util.assertEmitsExactly
import org.mobilenativefoundation.store.store5.util.fake.Notes
Expand All @@ -23,13 +30,8 @@ import org.mobilenativefoundation.store.store5.util.model.NetworkNote
import org.mobilenativefoundation.store.store5.util.model.NoteData
import org.mobilenativefoundation.store.store5.util.model.NotesWriteResponse
import org.mobilenativefoundation.store.store5.util.model.OutputNote
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull

@OptIn(ExperimentalCoroutinesApi::class, ExperimentalStoreApi::class)
@OptIn(ExperimentalStoreApi::class)
class UpdaterTests {
private val testScope = TestScope()
private lateinit var api: NotesApi
Expand Down Expand Up @@ -266,4 +268,71 @@ class UpdaterTests {
)
assertEquals(NetworkNote(NoteData.Single(newNote)), api.db[NotesKey.Single(Notes.One.id)])
}

@Test
fun collectResponseAfterWriting() = testScope.runTest {
val ttl = inHours(1)

val store = StoreBuilder.from<NotesKey, NetworkNote>(
fetcher = Fetcher.of { key -> api.get(key, ttl = ttl) },
)
.cachePolicy(MemoryPolicy.builder<NotesKey, NetworkNote>().setExpireAfterWrite(10.minutes).build())
.build().asMutableStore<NotesKey, NetworkNote, NetworkNote, NetworkNote, NetworkNote>(
Updater.by(
{ _, v -> UpdaterResult.Success.Typed(v) },
),
null,
)

val readRequest = StoreReadRequest.fresh(NotesKey.Single(Notes.One.id))

store.stream<NotesWriteResponse>(readRequest).test {
assertEquals(StoreReadResponse.Loading(origin = StoreReadResponseOrigin.Fetcher()), awaitItem())
assertEquals(
StoreReadResponse.Data(
NetworkNote(NoteData.Single(Notes.One), ttl = ttl),
StoreReadResponseOrigin.Fetcher()
),
awaitItem()
)

val newNote = Notes.One.copy(title = "New Title-1")
val writeRequest = StoreWriteRequest.of<NotesKey, NetworkNote, NotesWriteResponse>(
key = NotesKey.Single(Notes.One.id),
value = NetworkNote(NoteData.Single(newNote), 0)
)

val storeWriteResponse = store.write(writeRequest)

// Write is success
assertEquals(
StoreWriteResponse.Success.Typed(
NetworkNote(NoteData.Single(newNote), 0)
),
storeWriteResponse
)

// New data added by 'write' is collected

assertEquals(
NetworkNote(NoteData.Single(newNote), 0),
awaitItem().requireData()
)

// different key, not collected
store.write(StoreWriteRequest.of<NotesKey, NetworkNote, NotesWriteResponse>(
key = NotesKey.Single(Notes.Five.id),
value = NetworkNote(NoteData.Single(newNote), 0)
))
}

val cachedReadRequest =
StoreReadRequest.cached(NotesKey.Single(Notes.One.id), refresh = false)
val cachedStream = store.stream<NotesWriteResponse>(cachedReadRequest)

assertEquals(
NetworkNote(NoteData.Single(Notes.One.copy(title = "New Title-1")), 0),
cachedStream.first().requireData()
)
}
}

0 comments on commit c07640e

Please sign in to comment.