From f8c0304a9c8dfcccbc29d0beedc370667b48b4d7 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 19 Dec 2024 12:20:14 +0100 Subject: [PATCH] =?UTF-8?q?Properly=20cleanup=20thread=20locals=20for=20no?= =?UTF-8?q?n-CoroutineDispatcher-intercepte=E2=80=A6=20(#4303)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Properly cleanup thread locals for non-CoroutineDispatcher-intercepted continuations There was one codepath not covered by undispatched thread local cleanup procedure: when a custom ContinuationInterceptor is used and the scoped coroutine (i.e. withContext) is completed in-place without suspensions. Fixed with the introduction of the corresponding machinery for ScopeCoroutine Fixes #4296 --- .../common/src/AbstractCoroutine.kt | 10 ++ .../common/src/internal/Scopes.kt | 7 ++ .../common/src/intrinsics/Undispatched.kt | 1 + .../jvm/src/CoroutineContext.kt | 21 ++-- .../jvm/test/ThreadLocalsLeaksTest.kt | 100 ++++++++++++++++++ 5 files changed, 133 insertions(+), 6 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt diff --git a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt index d2f79c1381..2d6273acc9 100644 --- a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines import kotlinx.coroutines.CoroutineStart.* import kotlinx.coroutines.intrinsics.* import kotlin.coroutines.* +import kotlinx.coroutines.internal.ScopeCoroutine /** * Abstract base class for implementation of coroutines in coroutine builders. @@ -100,6 +101,15 @@ public abstract class AbstractCoroutine( afterResume(state) } + /** + * Invoked when the corresponding `AbstractCoroutine` was **conceptually** resumed, but not mechanically. + * Currently, this function only invokes `resume` on the underlying continuation for [ScopeCoroutine] + * or does nothing otherwise. + * + * Examples of resumes: + * - `afterCompletion` calls when the corresponding `Job` changed its state (i.e. got cancelled) + * - [AbstractCoroutine.resumeWith] was invoked + */ protected open fun afterResume(state: Any?): Unit = afterCompletion(state) internal final override fun handleOnCompletionException(exception: Throwable) { diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 7e561c83dc..9b830bd5c9 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -23,6 +23,13 @@ internal open class ScopeCoroutine( uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont)) } + /** + * Invoked when a scoped coorutine was completed in an undispatched manner directly + * at the place of its start because it never suspended. + */ + open fun afterCompletionUndispatched() { + } + override fun afterResume(state: Any?) { // Resume direct because scope is already in the correct context uCont.resumeWith(recoverResult(state, uCont)) diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt index 254182b387..511199701a 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt @@ -79,6 +79,7 @@ private inline fun ScopeCoroutine.undispatchedResult( if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1) val state = makeCompletingOnce(result) if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2) + afterCompletionUndispatched() return if (state is CompletedExceptionally) { // (3) when { shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont) diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt index ae8275f86f..7628d6ac85 100644 --- a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt @@ -185,7 +185,8 @@ internal actual class UndispatchedCoroutineactual constructor ( * `withContext` for the sake of logging, MDC, tracing etc., meaning that there exists thousands of * undispatched coroutines. * Each access to Java's [ThreadLocal] leaves a footprint in the corresponding Thread's `ThreadLocalMap` - * that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected. + * that is cleared automatically as soon as the associated thread-local (-> UndispatchedCoroutine) is garbage collected + * when either the corresponding thread is GC'ed or it cleans up its stale entries on other TL accesses. * When such coroutines are promoted to old generation, `ThreadLocalMap`s become bloated and an arbitrary accesses to thread locals * start to consume significant amount of CPU because these maps are open-addressed and cleaned up incrementally on each access. * (You can read more about this effect as "GC nepotism"). @@ -253,18 +254,26 @@ internal actual class UndispatchedCoroutineactual constructor ( } } + override fun afterCompletionUndispatched() { + clearThreadLocal() + } + override fun afterResume(state: Any?) { + clearThreadLocal() + // resume undispatched -- update context but stay on the same dispatcher + val result = recoverResult(state, uCont) + withContinuationContext(uCont, null) { + uCont.resumeWith(result) + } + } + + private fun clearThreadLocal() { if (threadLocalIsSet) { threadStateToRecover.get()?.let { (ctx, value) -> restoreThreadContext(ctx, value) } threadStateToRecover.remove() } - // resume undispatched -- update context but stay on the same dispatcher - val result = recoverResult(state, uCont) - withContinuationContext(uCont, null) { - uCont.resumeWith(result) - } } } diff --git a/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt b/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt new file mode 100644 index 0000000000..19d42fe01c --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ThreadLocalsLeaksTest.kt @@ -0,0 +1,100 @@ +package kotlinx.coroutines + +import kotlinx.coroutines.testing.TestBase +import java.lang.ref.WeakReference +import kotlin.coroutines.AbstractCoroutineContextElement +import kotlin.coroutines.Continuation +import kotlin.coroutines.ContinuationInterceptor +import kotlin.coroutines.CoroutineContext +import kotlin.test.Test + +/* + * This is an adapted verion of test from #4296. + * + * qwwdfsad: the test relies on System.gc() actually collecting the garbage. + * If these tests flake on CI, first check that JDK/GC setup in not an issue. + */ +class ThreadLocalCustomContinuationInterceptorTest : TestBase() { + + private class CustomContinuationInterceptor(private val delegate: ContinuationInterceptor) : + AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { + + override fun interceptContinuation(continuation: Continuation): Continuation { + return delegate.interceptContinuation(continuation) + } + } + + private class CustomNeverEqualContinuationInterceptor(private val delegate: ContinuationInterceptor) : + AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { + + override fun interceptContinuation(continuation: Continuation): Continuation { + return delegate.interceptContinuation(continuation) + } + + override fun equals(other: Any?) = false + } + + @Test(timeout = 20_000L) + fun testDefaultDispatcherNoSuspension() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = false) + + @Test(timeout = 20_000L) + fun testDefaultDispatcher() = ensureCoroutineContextGCed(Dispatchers.Default, suspend = true) + + + @Test(timeout = 20_000L) + fun testNonCoroutineDispatcher() = ensureCoroutineContextGCed( + CustomContinuationInterceptor(Dispatchers.Default), + suspend = true + ) + + @Test(timeout = 20_000L) + fun testNonCoroutineDispatcherSuspension() = ensureCoroutineContextGCed( + CustomContinuationInterceptor(Dispatchers.Default), + suspend = false + ) + + // Note asymmetric equals codepath never goes through the undispatched withContext, thus the separate test case + + @Test(timeout = 20_000L) + fun testNonCoroutineDispatcherAsymmetricEquals() = + ensureCoroutineContextGCed( + CustomNeverEqualContinuationInterceptor(Dispatchers.Default), + suspend = true + ) + + @Test(timeout = 20_000L) + fun testNonCoroutineDispatcherAsymmetricEqualsSuspension() = + ensureCoroutineContextGCed( + CustomNeverEqualContinuationInterceptor(Dispatchers.Default), + suspend = false + ) + + + @Volatile + private var letThatSinkIn: Any = "What is my purpose? To frag the garbage collctor" + + private fun ensureCoroutineContextGCed(coroutineContext: CoroutineContext, suspend: Boolean) { + fun forceGcUntilRefIsCleaned(ref: WeakReference) { + while (ref.get() != null) { + System.gc() + letThatSinkIn = LongArray(1024 * 1024) + } + } + + runTest { + lateinit var ref: WeakReference + val job = GlobalScope.launch(coroutineContext) { + val coroutineName = CoroutineName("Yo") + ref = WeakReference(coroutineName) + withContext(coroutineName) { + if (suspend) { + delay(1) + } + } + } + job.join() + + forceGcUntilRefIsCleaned(ref) + } + } +}