Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Paging Refactor #606

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix launchPagingStore
Signed-off-by: mramotar_dbx <mramotar@dropbox.com>
  • Loading branch information
matt-ramotar committed Feb 17, 2024
commit 1d78b440c5443f2db3ca94a76123220e6c2de865
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
package org.mobilenativefoundation.store.paging5

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.core5.StoreData
import org.mobilenativefoundation.store.core5.StoreKey
Expand All @@ -18,8 +19,6 @@ import org.mobilenativefoundation.store.store5.Store
import org.mobilenativefoundation.store.store5.StoreReadRequest
import org.mobilenativefoundation.store.store5.StoreReadResponse

private class StopProcessingException : Exception()

/**
* Initializes and returns a [StateFlow] that reflects the state of the Store, updating by a flow of provided keys.
* @param scope A [CoroutineScope].
Expand All @@ -33,38 +32,47 @@ private fun <Id : Any, Key : StoreKey<Id>, Output : StoreData<Id>> launchPagingS
keys: Flow<Key>,
stream: (key: Key) -> Flow<StoreReadResponse<Output>>,
): StateFlow<StoreReadResponse<Output>> {
val childScope = scope + Job()

val prevData = MutableStateFlow<StoreReadResponse.Data<Output>?>(null)
val stateFlow = MutableStateFlow<StoreReadResponse<Output>>(StoreReadResponse.Initial)
val activeStreams = mutableMapOf<Key, Job>()

scope.launch {
childScope.launch {
keys.collect { key ->
if (key !is StoreKey.Collection<*>) {
throw IllegalArgumentException("Invalid key type")
}

try {
val firstKey = keys.first()
if (firstKey !is StoreKey.Collection<*>) throw IllegalArgumentException("Invalid key type")
if (activeStreams[key]?.isActive != true) {
val job = this.launch {
stream(key).collect { response ->
when (response) {
is StoreReadResponse.Data<Output> -> {
val joinedDataResponse = joinData(key, prevData.value, response)
prevData.emit(joinedDataResponse)
stateFlow.emit(joinedDataResponse)
}

stream(firstKey).collect { response ->
if (response is StoreReadResponse.Data<Output>) {
val joinedDataResponse = joinData(firstKey, stateFlow.value, response)
stateFlow.emit(joinedDataResponse)
} else {
stateFlow.emit(response)
else -> {
stateFlow.emit(response)
}
}
}
}

if (response is StoreReadResponse.Data<Output> ||
response is StoreReadResponse.Error ||
response is StoreReadResponse.NoNewData
) {
throw StopProcessingException()
activeStreams[key] = job

job.invokeOnCompletion {
activeStreams[key]?.cancel()
activeStreams.remove(key)
}
}
} catch (_: StopProcessingException) {
}
}

keys.drop(1).collect { key ->
if (key !is StoreKey.Collection<*>) throw IllegalArgumentException("Invalid key type")
val firstDataResponse = stream(key).first { it.dataOrNull() != null } as StoreReadResponse.Data<Output>
val joinedDataResponse = joinData(key, stateFlow.value, firstDataResponse)
stateFlow.emit(joinedDataResponse)
}
scope.coroutineContext[Job]?.invokeOnCompletion {
childScope.cancel()
}

return stateFlow.asStateFlow()
Expand Down Expand Up @@ -101,16 +109,14 @@ fun <Id : Any, Key : StoreKey<Id>, Output : StoreData<Id>> MutableStore<Key, Out
@ExperimentalStoreApi
private fun <Id : Any, Key : StoreKey.Collection<Id>, Output : StoreData<Id>> joinData(
key: Key,
prevResponse: StoreReadResponse<Output>,
prevResponse: StoreReadResponse.Data<Output>?,
currentResponse: StoreReadResponse.Data<Output>
): StoreReadResponse.Data<Output> {
val lastOutput = when (prevResponse) {
is StoreReadResponse.Data<Output> -> prevResponse.value as? StoreData.Collection<Id, StoreData.Single<Id>>
else -> null
}

val currentData = currentResponse.value as StoreData.Collection<Id, StoreData.Single<Id>>

val joinedOutput = (lastOutput?.insertItems(key.insertionStrategy, currentData.items) ?: currentData) as Output
return StoreReadResponse.Data(joinedOutput, currentResponse.origin)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.mobilenativefoundation.store.paging5

import app.cash.turbine.test
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.TestScope
Expand Down Expand Up @@ -62,7 +63,7 @@ class LaunchPagingStoreTests {
fun multipleValidKeysEmittedInSuccession() = testScope.runTest {
val key1 = PostKey.Cursor("1", 10)
val key2 = PostKey.Cursor("11", 10)
val keys = flowOf(key1, key2)
val keys = MutableStateFlow(key1)
val stateFlow = store.launchPagingStore(this, keys)

stateFlow.test {
Expand All @@ -74,10 +75,15 @@ class LaunchPagingStoreTests {
assertIs<StoreReadResponse.Data<PostData.Feed>>(state3)
assertEquals("1", state3.value.posts[0].postId)

keys.emit(key2)

val loading2 = awaitItem()
assertIs<StoreReadResponse.Loading>(loading2)

val state4 = awaitItem()
assertIs<StoreReadResponse.Data<PostData.Feed>>(state4)
assertEquals("11", state4.value.posts[0].postId)
assertEquals("1", state4.value.posts[10].postId)
assertEquals("1", state4.value.posts[0].postId)
assertEquals("11", state4.value.posts[10].postId)
val data4 = state4.value
assertIs<PostData.Feed>(data4)
assertEquals(20, data4.items.size)
Expand Down Expand Up @@ -111,9 +117,10 @@ class LaunchPagingStoreTests {

val key1 = PostKey.Cursor("1", 10)
val key2 = PostKey.Cursor("11", 10)
val keys = flowOf(key1, key2)
val keys = MutableStateFlow(key1)

val stateFlow = store.launchPagingStore(this, keys)

stateFlow.test {
val initialState = awaitItem()
assertIs<StoreReadResponse.Initial>(initialState)
Expand All @@ -123,10 +130,18 @@ class LaunchPagingStoreTests {
assertIs<StoreReadResponse.Data<PostData.Feed>>(loadedState1)
val data1 = loadedState1.value
assertEquals(10, data1.posts.size)
assertEquals("1", data1.posts[0].postId)
expectNoEvents()

keys.emit(key2)

val loadingState2 = awaitItem()
assertIs<StoreReadResponse.Loading>(loadingState2)
val loadedState2 = awaitItem()
assertIs<StoreReadResponse.Data<PostData.Feed>>(loadedState2)
val data2 = loadedState2.value
assertEquals(20, data2.posts.size)
assertEquals("1", data2.posts[0].postId)
}

val cached = store.stream<PostPutRequestResult>(StoreReadRequest.cached(key1, refresh = false))
Expand Down Expand Up @@ -164,4 +179,44 @@ class LaunchPagingStoreTests {
assertIs<PostData.Feed>(data4)
assertEquals("2-modified", data4.posts[1].title)
}

@Test
fun multipleKeysWithReadsAndWritesUsingOneStream() = testScope.runTest {
val api = FakePostApi()
val db = FakePostDatabase(userId)
val factory = PostStoreFactory(api = api, db = db)
val mutableStore = factory.create()

val key1 = PostKey.Cursor("1", 10)
val key2 = PostKey.Cursor("11", 10)
val keys = flowOf(key1, key2)

val stateFlow = mutableStore.launchPagingStore(this, keys)
stateFlow.test {
val initialState = awaitItem()
assertIs<StoreReadResponse.Initial>(initialState)
val loadingState = awaitItem()
assertIs<StoreReadResponse.Loading>(loadingState)
val loadedState1 = awaitItem()
assertIs<StoreReadResponse.Data<PostData.Feed>>(loadedState1)
val data1 = loadedState1.value
assertEquals(10, data1.posts.size)
assertEquals("1", data1.posts[0].postId)
val loadedState2 = awaitItem()
assertIs<StoreReadResponse.Data<PostData.Feed>>(loadedState2)
val data2 = loadedState2.value
assertEquals(20, data2.posts.size)
assertEquals("1", data1.posts[0].postId)
}

mutableStore.write(StoreWriteRequest.of(PostKey.Single("2"), PostData.Post("2", "2-modified")))

stateFlow.test {
val loadedState3 = awaitItem()
assertIs<StoreReadResponse.Data<PostData.Feed>>(loadedState3)
val data3 = loadedState3.value
assertEquals(20, data3.posts.size)
assertEquals("2-modified", data3.posts[1].title) // Actual is "2"
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package org.mobilenativefoundation.store.paging5.util

import org.mobilenativefoundation.store.core5.ExperimentalStoreApi
import org.mobilenativefoundation.store.core5.InsertionStrategy
import org.mobilenativefoundation.store.core5.StoreData

@OptIn(ExperimentalStoreApi::class)
sealed class PostData : StoreData<String> {
data class Post(val postId: String, val title: String) : StoreData.Single<String>, PostData() {
override val id: String get() = postId
Expand All @@ -15,14 +17,14 @@ sealed class PostData : StoreData<String> {

return when (strategy) {
InsertionStrategy.APPEND -> {
val updatedItems = items.toMutableList()
updatedItems.addAll(posts)
val updatedItems = posts.toMutableList()
updatedItems.addAll(items)
copyWith(items = updatedItems)
}

InsertionStrategy.PREPEND -> {
val updatedItems = posts.toMutableList()
updatedItems.addAll(items)
val updatedItems = items.toMutableList()
updatedItems.addAll(posts)
copyWith(items = updatedItems)
}

Expand Down