Skip to content

Commit

Permalink
Increase the deprecation levels for BroadcastChannel APIs (#4197)
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb authored Aug 12, 2024
1 parent d010110 commit 5c8e650
Show file tree
Hide file tree
Showing 21 changed files with 47 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import java.util.concurrent.*
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Benchmark)
@Suppress("DEPRECATION_ERROR")
open class StatefulAsyncBenchmark : ParametrizedDispatcherBase() {

private val stateSize = 2048
Expand Down
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -987,7 +987,6 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun asFlow (Lkotlin/ranges/IntRange;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/ranges/LongRange;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlin/sequences/Sequence;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow (Lkotlinx/coroutines/channels/BroadcastChannel;)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([I)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([J)Lkotlinx/coroutines/flow/Flow;
public static final fun asFlow ([Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,6 @@ final fun <#A: kotlin/Any?> (kotlin.coroutines/SuspendFunction0<#A>).kotlinx.cor
final fun <#A: kotlin/Any?> (kotlin.sequences/Sequence<#A>).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.sequences.Sequence<0:0>(){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlin/Array<#A>).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.Array<0:0>(){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlin/Function0<#A>).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/asFlow|asFlow@kotlin.Function0<0:0>(){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/BroadcastChannel<#A>).kotlinx.coroutines.flow/asFlow(): kotlinx.coroutines.flow/Flow<#A> // kotlinx.coroutines.flow/asFlow|asFlow@kotlinx.coroutines.channels.BroadcastChannel<0:0>(){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/broadcast(kotlin/Int = ..., kotlinx.coroutines/CoroutineStart = ...): kotlinx.coroutines.channels/BroadcastChannel<#A> // kotlinx.coroutines.channels/broadcast|broadcast@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.Int;kotlinx.coroutines.CoroutineStart){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/distinct(): kotlinx.coroutines.channels/ReceiveChannel<#A> // kotlinx.coroutines.channels/distinct|distinct@kotlinx.coroutines.channels.ReceiveChannel<0:0>(){0§<kotlin.Any?>}[0]
final fun <#A: kotlin/Any?> (kotlinx.coroutines.channels/ReceiveChannel<#A>).kotlinx.coroutines.channels/drop(kotlin/Int, kotlin.coroutines/CoroutineContext = ...): kotlinx.coroutines.channels/ReceiveChannel<#A> // kotlinx.coroutines.channels/drop|drop@kotlinx.coroutines.channels.ReceiveChannel<0:0>(kotlin.Int;kotlin.coroutines.CoroutineContext){0§<kotlin.Any?>}[0]
Expand Down
86 changes: 5 additions & 81 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@file:Suppress("DEPRECATION")
@file:Suppress("DEPRECATION", "DEPRECATION_ERROR")

package kotlinx.coroutines.channels

Expand All @@ -10,36 +10,10 @@ import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

/**
* Broadcasts all elements of the channel.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses [BroadcastChannel] with a buffer of given capacity,
* when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
* otherwise -- throws [IllegalArgumentException].
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
* but it is not as prompt.
*
* ### Future replacement
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways.
*
* **Note: This API is obsolete since 1.5.0.** It is deprecated with warning in 1.7.0.
* It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
Expand All @@ -56,60 +30,10 @@ public fun <E> ReceiveChannel<E>.broadcast(
}

/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
* and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
* object can be used to [subscribe][BroadcastChannel.openSubscription] to elements produced by this coroutine.
*
* The scope of the coroutine contains [ProducerScope] interface, which implements
* both [CoroutineScope] and [SendChannel], so that coroutine can invoke
* [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
* when the coroutine completes.
*
* Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
* with corresponding [context] element.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
* the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* - when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses [BroadcastChannel] with a buffer of given capacity,
* - when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
* - otherwise -- throws [IllegalArgumentException].
*
* **Note:** By default, the coroutine does not start until the first subscriber appears via [BroadcastChannel.openSubscription]
* as [start] parameter has a value of [CoroutineStart.LAZY] by default.
* This ensures that the first subscriber does not miss any sent elements.
* However, later subscribers may miss elements.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
* coroutine is doing something else.
*
* ### Future replacement
*
* This API is obsolete since 1.5.0 and deprecated with warning since 1.7.0.
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways.
* It is replaced with [Flow.shareIn][kotlinx.coroutines.flow.shareIn] operator.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
Expand Down
76 changes: 14 additions & 62 deletions kotlinx-coroutines-core/common/src/channels/BroadcastChannel.kt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@file:Suppress("FunctionName", "DEPRECATION")
@file:Suppress("FunctionName", "DEPRECATION", "DEPRECATION_ERROR")

package kotlinx.coroutines.channels

Expand All @@ -10,62 +10,35 @@ import kotlinx.coroutines.channels.Channel.Factory.CONFLATED
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.native.concurrent.*

/**
* Broadcast channel is a non-blocking primitive for communication between the sender and multiple receivers
* that subscribe for the elements using [openSubscription] function and unsubscribe using [ReceiveChannel.cancel]
* function.
*
* See `BroadcastChannel()` factory function for the description of available
* broadcast channel implementations.
*
* **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
* It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow].
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public interface BroadcastChannel<E> : SendChannel<E> {
/**
* Subscribes to this [BroadcastChannel] and returns a channel to receive elements from it.
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this
* broadcast channel.
* @suppress
*/
public fun openSubscription(): ReceiveChannel<E>

/**
* Cancels reception of remaining elements from this channel with an optional cause.
* This function closes the channel with
* the specified cause (unless it was already closed), removes all buffered sent elements from it,
* and [cancels][ReceiveChannel.cancel] all open subscriptions.
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* @suppress
*/
public fun cancel(cause: CancellationException? = null)

/**
* @suppress This method has bad semantics when cause is not a [CancellationException]. Use [cancel].
* @suppress
*/
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility only")
public fun cancel(cause: Throwable? = null): Boolean
}

/**
* Creates a broadcast channel with the specified buffer capacity.
*
* The resulting channel type depends on the specified [capacity] parameter:
*
* - when `capacity` positive, but less than [UNLIMITED] -- creates `ArrayBroadcastChannel` with a buffer of given capacity.
* **Note:** this channel looses all items that have been sent to it until the first subscriber appears;
* - when `capacity` is [CONFLATED] -- creates [ConflatedBroadcastChannel] that conflates back-to-back sends;
* - when `capacity` is [BUFFERED] -- creates `ArrayBroadcastChannel` with a default capacity.
* - otherwise -- throws [IllegalArgumentException].
*
* **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
* It is replaced with [SharedFlow][kotlinx.coroutines.flow.SharedFlow] and [StateFlow][kotlinx.coroutines.flow.StateFlow].
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "BroadcastChannel is deprecated in the favour of SharedFlow and StateFlow, and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and StateFlow, and is no longer supported")
public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
when (capacity) {
0 -> throw IllegalArgumentException("Unsupported 0 capacity for BroadcastChannel")
Expand All @@ -76,49 +49,28 @@ public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E> =
}

/**
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
*
* Back-to-send sent elements are _conflated_ -- only the most recently sent value is received,
* while previously sent elements **are lost**.
* Every subscriber immediately receives the most recently sent element.
* Sender to this broadcast channel never suspends and [trySend] always succeeds.
*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
*
* In this implementation, [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription
* takes linear time in the number of subscribers.
*
* **Note: This API is obsolete since 1.5.0 and deprecated for removal since 1.7.0**
* It is replaced with [SharedFlow][kotlinx.coroutines.flow.StateFlow].
* @suppress obsolete since 1.5.0, WARNING since 1.7.0, ERROR since 1.9.0
*/
@ObsoleteCoroutinesApi
@Deprecated(level = DeprecationLevel.WARNING, message = "ConflatedBroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
@Deprecated(level = DeprecationLevel.ERROR, message = "ConflatedBroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public class ConflatedBroadcastChannel<E> private constructor(
private val broadcast: BroadcastChannelImpl<E>
) : BroadcastChannel<E> by broadcast {
public constructor(): this(BroadcastChannelImpl<E>(capacity = CONFLATED))
/**
* Creates an instance of this class that already holds a value.
*
* It is as a shortcut to creating an instance with a default constructor and
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
* @suppress
*/
public constructor(value: E) : this() {
trySend(value)
}

/**
* The most recently sent element to this channel.
*
* Access to this property throws [IllegalStateException] when this class is constructed without
* initial value and no value was sent yet or if it was [closed][close] without a cause.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
* @suppress
*/
public val value: E get() = broadcast.value

/**
* The most recently sent element to this channel or `null` when this class is constructed without
* initial value and no value was sent yet or if it was [closed][close].
* @suppress
*/
public val valueOrNull: E? get() = broadcast.valueOrNull
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/channels/Deprecated.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import kotlin.jvm.*
* Safe to remove in 1.9.0 as was inline before.
*/
@ObsoleteCoroutinesApi
@Suppress("DEPRECATION")
@Suppress("DEPRECATION_ERROR")
@Deprecated(level = DeprecationLevel.ERROR, message = "BroadcastChannel is deprecated in the favour of SharedFlow and is no longer supported")
public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
val channel = openSubscription()
Expand Down
19 changes: 0 additions & 19 deletions kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -131,25 +131,6 @@ private class ChannelAsFlow<T>(
override fun additionalToStringProps(): String = "channel=$channel"
}

/**
* Represents the given broadcast channel as a hot flow.
* Every flow collector will trigger a new broadcast channel subscription.
*
* ### Cancellation semantics
* 1) Flow consumer is cancelled when the original channel is cancelled.
* 2) Flow consumer completes normally when the original channel completes (~is closed) normally.
* 3) If the flow consumer fails with an exception, subscription is cancelled.
*/
@Suppress("DEPRECATION")
@Deprecated(
level = DeprecationLevel.ERROR,
message = "'BroadcastChannel' is obsolete and all corresponding operators are deprecated " +
"in the favour of StateFlow and SharedFlow"
) // Since 1.5.0, ERROR since 1.7.0, was @FlowPreview, safe to remove in 1.8.0
public fun <T> BroadcastChannel<T>.asFlow(): Flow<T> = flow {
emitAll(openSubscription())
}

/**
* Creates a [produce] coroutine that collects the given flow.
*
Expand Down
15 changes: 8 additions & 7 deletions kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class CancelledParentAttachTest : TestBase() {

@Test
fun testAsync() = runTest {
CoroutineStart.values().forEach { testAsyncCancelledParent(it) }
CoroutineStart.entries.forEach { testAsyncCancelledParent(it) }
}

private suspend fun testAsyncCancelledParent(start: CoroutineStart) {
Expand All @@ -25,14 +25,14 @@ class CancelledParentAttachTest : TestBase() {
}
}
expectUnreached()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// Expected
}
}

@Test
fun testLaunch() = runTest {
CoroutineStart.values().forEach { testLaunchCancelledParent(it) }
CoroutineStart.entries.forEach { testLaunchCancelledParent(it) }
}

private suspend fun testLaunchCancelledParent(start: CoroutineStart) {
Expand All @@ -48,7 +48,7 @@ class CancelledParentAttachTest : TestBase() {
}
}
expectUnreached()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// Expected
}
}
Expand All @@ -67,9 +67,10 @@ class CancelledParentAttachTest : TestBase() {

@Test
fun testBroadcast() = runTest {
CoroutineStart.values().forEach { testBroadcastCancelledParent(it) }
CoroutineStart.entries.forEach { testBroadcastCancelledParent(it) }
}

@Suppress("DEPRECATION_ERROR")
private suspend fun testBroadcastCancelledParent(start: CoroutineStart) {
try {
withContext(Job()) {
Expand All @@ -83,7 +84,7 @@ class CancelledParentAttachTest : TestBase() {
}
}
expectUnreached()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// Expected
}
}
Expand All @@ -105,7 +106,7 @@ class CancelledParentAttachTest : TestBase() {
block()
}
expectUnreached()
} catch (e: CancellationException) {
} catch (_: CancellationException) {
// Expected
}
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/test/ParentCancellationTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package kotlinx.coroutines

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*
import kotlin.test.*

/**
Expand Down Expand Up @@ -57,6 +56,7 @@ class ParentCancellationTest : TestBase() {
}

@Test
@Suppress("DEPRECATION_ERROR")
fun testBroadcastChild() = runTest {
testParentCancellation(runsInScopeContext = true) { fail ->
broadcast<Unit> { fail() }.openSubscription()
Expand Down Expand Up @@ -165,4 +165,4 @@ class ParentCancellationTest : TestBase() {
}
finish(3)
}
}
}
Loading

0 comments on commit 5c8e650

Please sign in to comment.