Skip to content

Commit

Permalink
Update StoreChannelManagerTests.kt
Browse files Browse the repository at this point in the history
Minor changes for readability, consistency, and scope management
  • Loading branch information
matt-ramotar authored Jun 1, 2024
1 parent 891100c commit 4a784fe
Showing 1 changed file with 33 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package org.mobilenativefoundation.store.multicast5


import app.cash.turbine.test
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand All @@ -22,52 +19,57 @@ import kotlin.test.assertEquals
class StoreChannelManagerTests {

@Test
fun cancelledDownstreamChannelShouldNotCancelOtherChannels() {
val messages = listOf(1, 2, 3)
val scope = CoroutineScope(Dispatchers.Default)
fun cancelledDownstreamChannelShouldNotCancelOtherChannels() = runTest {
val coroutineScope = CoroutineScope(Dispatchers.Default)
val lockUpstream = Mutex(true)
val testMessages = listOf(1, 2, 3)
val numChannels = 20
val upstreamFlow = flow {
lockUpstream.withLock {
messages.onEach { emit(it) }
testMessages.onEach { emit(it) }
}
}
val channelManager = StoreChannelManager(
scope = scope,
scope = coroutineScope,
bufferSize = 0,
upstream = upstreamFlow,
piggybackingDownstream = false,
keepUpstreamAlive = false,
onEach = { }
)
val channels =
(1..20).map { Channel<ChannelManager.Message.Dispatch<Int>>(Channel.UNLIMITED) }

val cancelledChannel =
Channel<ChannelManager.Message.Dispatch<Int>>(Channel.UNLIMITED).also {
scope.launch {
it.consumeAsFlow().first()
val channels = createChannels(numChannels)
val channelToBeCancelled = Channel<ChannelManager.Message.Dispatch<Int>>(Channel.UNLIMITED)
.also { channel ->
coroutineScope.launch {
channel.consumeAsFlow().test {
cancelAndIgnoreRemainingEvents()
}
}
}

scope.launch {
coroutineScope.launch {
channels.forEach { channelManager.addDownstream(it) }
lockUpstream.unlock()
}
scope.launch { channelManager.addDownstream(cancelledChannel) }
coroutineScope.launch {
channelManager.addDownstream(channelToBeCancelled)
}

runTest {
channels.forEach { channel ->
val messagesFlow = channel.consumeAsFlow()
.filterIsInstance<ChannelManager.Message.Dispatch.Value<Int>>()
.onEach { it.delivered.complete(Unit) }
.map { it.value }
channels.forEach { channel ->
val messagesFlow = channel.consumeAsFlow()
.filterIsInstance<ChannelManager.Message.Dispatch.Value<Int>>()
.onEach { it.delivered.complete(Unit) }

assertEquals(
messages,
messagesFlow.take(3).toList(),
)
messagesFlow.test {
for (message in testMessages) {
val dispatchValue = awaitItem()
assertEquals(message, dispatchValue.value)
}
awaitComplete()
}
}
scope.cancel()
}

private fun createChannels(count: Int): List<Channel<ChannelManager.Message.Dispatch<Int>>> {
return (1..count).map { Channel(Channel.UNLIMITED) }
}
}

0 comments on commit 4a784fe

Please sign in to comment.