Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove various obsolete code #4196

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -1259,23 +1259,6 @@ public final class kotlinx/coroutines/intrinsics/CancellableKt {
public static final fun startCoroutineCancellable (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)V
}

public class kotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher : kotlinx/coroutines/ExecutorCoroutineDispatcher {
public synthetic fun <init> (II)V
public synthetic fun <init> (IIILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IIJLjava/lang/String;)V
public synthetic fun <init> (IIJLjava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (IILjava/lang/String;)V
public synthetic fun <init> (IILjava/lang/String;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public final fun blocking (I)Lkotlinx/coroutines/CoroutineDispatcher;
public static synthetic fun blocking$default (Lkotlinx/coroutines/scheduling/ExperimentalCoroutineDispatcher;IILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher;
public fun close ()V
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun getExecutor ()Ljava/util/concurrent/Executor;
public final fun limited (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun toString ()Ljava/lang/String;
}

public final class kotlinx/coroutines/selects/OnTimeoutKt {
public static final fun onTimeout (Lkotlinx/coroutines/selects/SelectBuilder;JLkotlin/jvm/functions/Function1;)V
public static final fun onTimeout-8Mi8wO0 (Lkotlinx/coroutines/selects/SelectBuilder;JLkotlin/jvm/functions/Function1;)V
Expand Down
21 changes: 13 additions & 8 deletions kotlinx-coroutines-core/common/src/SchedulerTask.common.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package kotlinx.coroutines

/**
* A [Runnable] that's especially optimized for running in [Dispatchers.Default] on the JVM.
*
* Replacing a [SchedulerTask] with a [Runnable] should not lead to any change in observable behavior.
*
* An arbitrary [Runnable], once it is dispatched by [Dispatchers.Default], gets wrapped into a class that
* stores the submission time, the execution context, etc.
* For [Runnable] instances that we know are only going to be executed in dispatch procedures, we can avoid the
* overhead of separately allocating a wrapper, and instead have the [Runnable] contain the required fields
* on construction.
*
* When running outside the standard dispatchers, these new fields are just dead weight.
*/
internal expect abstract class SchedulerTask internal constructor() : Runnable
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really appreciate how you drop off improvements and additional commentary when doing the stewardship. Thanks!


internal expect interface SchedulerTaskContext

@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
internal expect val SchedulerTask.taskContext: SchedulerTaskContext

@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
internal expect inline fun SchedulerTaskContext.afterTask()
16 changes: 4 additions & 12 deletions kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ internal abstract class DispatchedTask<in T> internal constructor(

final override fun run() {
assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
Expand Down Expand Up @@ -107,8 +106,7 @@ internal abstract class DispatchedTask<in T> internal constructor(
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
fatalException?.let { handleFatalException(it) }
}
}

Expand All @@ -130,15 +128,9 @@ internal abstract class DispatchedTask<in T> internal constructor(
* Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
* a failed coroutine, but such exceptions should be reported anyway.
*/
internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
if (exception === null && finallyException === null) return
if (exception !== null && finallyException !== null) {
exception.addSuppressed(finallyException)
}

val cause = exception ?: finallyException
internal fun handleFatalException(exception: Throwable) {
val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
"Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
"Please read KDoc to 'handleFatalException' method and report this incident to maintainers", exception)
handleCoroutineException(this.delegate.context, reason)
}
}
Expand Down Expand Up @@ -203,7 +195,7 @@ internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
* This exception doesn't happen normally, only if we have a bug in implementation.
* Report it as a fatal exception.
*/
handleFatalException(e, null)
handleFatalException(e)
} finally {
eventLoop.decrementUseCount(unconfined = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ package kotlinx.coroutines.internal
* where atomicfu doesn't support its tranformations.
*
* Have `Local` prefix to avoid AFU clashes during star-imports
*
* TODO: remove after https://youtrack.jetbrains.com/issue/KT-62423/
*/
internal expect class LocalAtomicInt(value: Int) {
fun get(): Int
fun set(value: Int)
fun decrementAndGet(): Int
}

internal inline var LocalAtomicInt.value
get() = get()
set(value) = set(value)
10 changes: 0 additions & 10 deletions kotlinx-coroutines-core/jsAndWasmShared/src/SchedulerTask.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
package kotlinx.coroutines

internal actual abstract class SchedulerTask : Runnable

internal actual interface SchedulerTaskContext { }

private object TaskContext: SchedulerTaskContext { }

internal actual val SchedulerTask.taskContext: SchedulerTaskContext get() = TaskContext

@Suppress("NOTHING_TO_INLINE")
internal actual inline fun SchedulerTaskContext.afterTask() {}

11 changes: 5 additions & 6 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,12 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)

internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {

/*
* Attempts to reflectively (to be Java 6 compatible) invoke
* ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
* internal scheduler queue on cancellation.
*/
init {
removeFutureOnCancel(executor)
/* Attempt to invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to clean up
* the internal scheduler queue on cancellation. */
if (executor is ScheduledThreadPoolExecutor) {
executor.removeOnCancelPolicy = true
}
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
9 changes: 0 additions & 9 deletions kotlinx-coroutines-core/jvm/src/SchedulerTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,3 @@ package kotlinx.coroutines
import kotlinx.coroutines.scheduling.*

internal actual typealias SchedulerTask = Task

internal actual typealias SchedulerTaskContext = TaskContext

@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
internal actual val SchedulerTask.taskContext: SchedulerTaskContext get() = taskContext

@Suppress("NOTHING_TO_INLINE", "EXTENSION_SHADOWED_BY_MEMBER")
internal actual inline fun SchedulerTaskContext.afterTask() =
afterTask()
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

internal actual class AbortFlowException actual constructor(
@JvmField @Transient actual val owner: Any
Expand Down
19 changes: 0 additions & 19 deletions kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package kotlinx.coroutines.internal

import java.lang.reflect.*
import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.withLock as withLockJvm

@Suppress("ACTUAL_WITHOUT_EXPECT")
Expand All @@ -22,20 +20,3 @@ internal actual annotation class BenignDataRace()
@Suppress("NOTHING_TO_INLINE") // So that R8 can completely remove ConcurrentKt class
internal actual inline fun <E> identitySet(expectedSize: Int): MutableSet<E> =
Collections.newSetFromMap(IdentityHashMap(expectedSize))

private val REMOVE_FUTURE_ON_CANCEL: Method? = try {
ScheduledThreadPoolExecutor::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.java)
} catch (e: Throwable) {
null
}

@Suppress("NAME_SHADOWING")
internal fun removeFutureOnCancel(executor: Executor): Boolean {
try {
val executor = executor as? ScheduledThreadPoolExecutor ?: return false
(REMOVE_FUTURE_ON_CANCEL ?: return false).invoke(executor, true)
return true
} catch (e: Throwable) {
return false // failed to setRemoveOnCancelPolicy, assume it does not removes future on cancel
}
}
8 changes: 4 additions & 4 deletions kotlinx-coroutines-core/jvm/src/internal/FastServiceLoader.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package kotlinx.coroutines.internal

import kotlinx.coroutines.CoroutineExceptionHandler
import java.io.*
import java.net.*
import java.util.*
import java.util.jar.*
import java.util.zip.*
import kotlin.collections.ArrayList

/**
* Don't use JvmField here to enable R8 optimizations via "assumenosideeffects"
Expand Down Expand Up @@ -68,7 +68,7 @@ internal object FastServiceLoader {
// Also search for test-module factory
createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) }
result
} catch (e: Throwable) {
} catch (_: Throwable) {
// Fallback to the regular SL in case of any unexpected exception
load(clz, clz.classLoader)
}
Expand All @@ -85,15 +85,15 @@ internal object FastServiceLoader {
return try {
val clz = Class.forName(serviceClass, true, baseClass.classLoader)
baseClass.cast(clz.getDeclaredConstructor().newInstance())
} catch (e: ClassNotFoundException) { // Do not fail if TestMainDispatcherFactory is not found
} catch (_: ClassNotFoundException) { // Do not fail if TestMainDispatcherFactory is not found
null
}
}

private fun <S> load(service: Class<S>, loader: ClassLoader): List<S> {
return try {
loadProviders(service, loader)
} catch (e: Throwable) {
} catch (_: Throwable) {
// Fallback to default service loader
ServiceLoader.load(service, loader).toList()
}
Expand Down
60 changes: 24 additions & 36 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ import kotlin.math.*
*
* ### Support for blocking tasks
*
* The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks.
* The scheduler also supports the notion of [blocking][Task.isBlocking] tasks.
* When executing or enqueuing blocking tasks, the scheduler notifies or creates an additional worker in
* addition to the core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
* available to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
Expand Down Expand Up @@ -425,7 +425,7 @@ internal class CoroutineScheduler(
block.taskContext = taskContext
return block
}
return TaskImpl(block, nanoTime, taskContext)
return block.asTask(nanoTime, taskContext)
}

// NB: should only be called from 'dispatch' method due to blocking tasks increment
Expand Down Expand Up @@ -514,7 +514,7 @@ internal class CoroutineScheduler(
*/
if (state === WorkerState.TERMINATED) return task
// Do not add CPU tasks in local queue if we are not able to execute it
if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
if (!task.isBlocking && state === WorkerState.BLOCKING) {
return task
}
mayHaveLocalTasks = true
Expand Down Expand Up @@ -810,29 +810,26 @@ internal class CoroutineScheduler(
private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK

private fun executeTask(task: Task) {
val taskMode = task.mode
idleReset(taskMode)
beforeTask(taskMode)
runSafely(task)
afterTask(taskMode)
}

private fun beforeTask(taskMode: Int) {
if (taskMode == TASK_NON_BLOCKING) return
// Always notify about new work when releasing CPU-permit to execute some blocking task
if (tryReleaseCpu(WorkerState.BLOCKING)) {
signalCpuWork()
terminationDeadline = 0L // reset deadline for termination
if (state == WorkerState.PARKING) {
assert { task.isBlocking }
state = WorkerState.BLOCKING
}
}

private fun afterTask(taskMode: Int) {
if (taskMode == TASK_NON_BLOCKING) return
decrementBlockingTasks()
val currentState = state
// Shutdown sequence of blocking dispatcher
if (currentState !== WorkerState.TERMINATED) {
assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
state = WorkerState.DORMANT
if (task.isBlocking) {
// Always notify about new work when releasing CPU-permit to execute some blocking task
if (tryReleaseCpu(WorkerState.BLOCKING)) {
signalCpuWork()
}
runSafely(task)
decrementBlockingTasks()
val currentState = state
// Shutdown sequence of blocking dispatcher
if (currentState !== WorkerState.TERMINATED) {
assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
state = WorkerState.DORMANT
}
} else {
runSafely(task)
}
}

Expand Down Expand Up @@ -923,15 +920,6 @@ internal class CoroutineScheduler(
state = WorkerState.TERMINATED
}

// It is invoked by this worker when it finds a task
private fun idleReset(mode: Int) {
terminationDeadline = 0L // reset deadline for termination
if (state == WorkerState.PARKING) {
assert { mode == TASK_PROBABLY_BLOCKING }
state = WorkerState.BLOCKING
}
}

fun findTask(mayHaveLocalTasks: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
/*
Expand Down Expand Up @@ -1013,12 +1001,12 @@ internal class CoroutineScheduler(

enum class WorkerState {
/**
* Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one.
* Has CPU token and either executes a [Task.isBlocking]` == false` task or tries to find one.
*/
CPU_ACQUIRED,

/**
* Executing task with [TASK_PROBABLY_BLOCKING].
* Executing task with [Task.isBlocking].
*/
BLOCKING,

Expand Down
Loading