Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zip and combine performance #2308

Merged
merged 15 commits into from
Oct 20, 2020
30 changes: 2 additions & 28 deletions benchmarks/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,12 @@ tasks.named<KotlinCompile>("compileJmhKotlin") {
}
}

/*
* Due to a bug in the inliner it sometimes does not remove inlined symbols (that are later renamed) from unused code paths,
* and it breaks JMH that tries to post-process these symbols and fails because they are renamed.
*/
val removeRedundantFiles by tasks.registering(Delete::class) {
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$nBlanks\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$bonusForDoubleLetter\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$score2\$1\$\$special\$\$inlined\$map\$1\$2\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$1\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOptKt\$\$special\$\$inlined\$collect\$2\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/FlowPlaysScrabbleBase\$play\$histoOfLetters\$1\$\$special\$\$inlined\$fold\$1\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/scrabble/SaneFlowPlaysScrabble\$play\$buildHistoOnScore\$1\$\$special\$\$inlined\$filter\$1\$1.class")

// Primes
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$2\$1.class")
delete("$buildDir/classes/kotlin/jmh/benchmarks/flow/misc/Numbers\$\$special\$\$inlined\$filter\$1\$1.class")
}

tasks.named("jmhRunBytecodeGenerator") {
dependsOn(removeRedundantFiles)
}

// It is better to use the following to run benchmarks, otherwise you may get unexpected errors:
// ./gradlew --no-daemon cleanJmhJar jmh -Pjmh="MyBenchmark"
extensions.configure<JMHPluginExtension>("jmh") {
jmhVersion = "1.21"
jmhVersion = "1.26"
duplicateClassesStrategy = DuplicatesStrategy.INCLUDE
failOnError = true
resultFormat = "CSV"
Expand All @@ -80,7 +54,7 @@ tasks.named<Jar>("jmhJar") {
}

dependencies {
compile("org.openjdk.jmh:jmh-core:1.21")
compile("org.openjdk.jmh:jmh-core:1.26")
compile("io.projectreactor:reactor-core:${version("reactor")}")
compile("io.reactivex.rxjava2:rxjava:2.1.9")
compile("com.github.akarnokd:rxjava2-extensions:0.20.8")
Expand Down
34 changes: 34 additions & 0 deletions benchmarks/src/jmh/kotlin/benchmarks/flow/CombineFlowsBenchmark.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*

@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class CombineFlowsBenchmark {

@Param("10", "100", "1000")
private var size = 10

@Benchmark
fun combine() = runBlocking {
combine((1 until size).map { flowOf(it) }) { a -> a}.collect()
}

@Benchmark
fun combineTransform() = runBlocking {
val list = (1 until size).map { flowOf(it) }.toList()
combineTransform((1 until size).map { flowOf(it) }) { emit(it) }.collect()
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package benchmarks.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.internal.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.*

@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
open class CombineTwoFlowsBenchmark {

@Param("100", "100000", "1000000")
private var size = 100000

@Benchmark
fun combinePlain() = runBlocking {
val flow = (1 until size.toLong()).asFlow()
flow.combine(flow) { a, b -> a + b }.collect()
}

@Benchmark
fun combineTransform() = runBlocking {
val flow = (1 until size.toLong()).asFlow()
flow.combineTransform(flow) { a, b -> emit(a + b) }.collect()
}

@Benchmark
fun combineVararg() = runBlocking {
val flow = (1 until size.toLong()).asFlow()
combine(listOf(flow, flow)) { arr -> arr[0] + arr[1] }.collect()
}

@Benchmark
fun combineTransformVararg() = runBlocking {
val flow = (1 until size.toLong()).asFlow()
combineTransform(listOf(flow, flow)) { arr -> emit(arr[0] + arr[1]) }.collect()
}
}
6 changes: 3 additions & 3 deletions benchmarks/src/jmh/kotlin/benchmarks/flow/NumbersBenchmark.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ open class NumbersBenchmark {

@Benchmark
fun zipRx() {
val numbers = rxNumbers().take(natural.toLong())
val numbers = rxNumbers().take(natural)
val first = numbers
.filter { it % 2L != 0L }
.map { it * it }
val second = numbers
.filter { it % 2L == 0L }
.map { it * it }
first.zipWith(second, BiFunction<Long, Long, Long> { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count()
first.zipWith(second, { v1, v2 -> v1 + v2 }).filter { it % 3 == 0L }.count()
.blockingGet()
}

Expand All @@ -98,7 +98,7 @@ open class NumbersBenchmark {

@Benchmark
fun transformationsRx(): Long {
return rxNumbers().take(natural.toLong())
return rxNumbers().take(natural)
.filter { it % 2L != 0L }
.map { it * it }
.filter { (it + 1) % 3 == 0L }.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,6 @@ internal abstract class AbstractSendChannel<E>(
return sendSuspend(element)
}

internal suspend fun sendFair(element: E) {
if (offerInternal(element) === OFFER_SUCCESS) {
yield() // Works only on fast path to properly work in sequential use-cases
return
}
return sendSuspend(element)
}

public final override fun offer(element: E): Boolean {
val result = offerInternal(element)
return when {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,4 @@ internal open class ChannelCoroutine<E>(
_channel.cancel(exception) // cancel the channel
cancelCoroutine(exception) // cancel the job
}

@Suppress("UNCHECKED_CAST")
suspend fun sendFair(element: E) {
(_channel as AbstractSendChannel<E>).sendFair(element)
}
}
14 changes: 7 additions & 7 deletions kotlinx-coroutines-core/common/src/flow/Migration.kt
Original file line number Diff line number Diff line change
Expand Up @@ -367,35 +367,35 @@ public fun <T1, T2, R> Flow<T1>.combineLatest(other: Flow<T2>, transform: suspen
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, transform)")
)
public inline fun <T1, T2, T3, R> Flow<T1>.combineLatest(
public fun <T1, T2, T3, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
crossinline transform: suspend (T1, T2, T3) -> R
transform: suspend (T1, T2, T3) -> R
) = combine(this, other, other2, transform)

@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
)
public inline fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
public fun <T1, T2, T3, T4, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
other3: Flow<T4>,
crossinline transform: suspend (T1, T2, T3, T4) -> R
transform: suspend (T1, T2, T3, T4) -> R
) = combine(this, other, other2, other3, transform)

@Deprecated(
level = DeprecationLevel.ERROR,
message = "Flow analogue of 'combineLatest' is 'combine'",
replaceWith = ReplaceWith("combine(this, other, other2, other3, transform)")
)
public inline fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
public fun <T1, T2, T3, T4, T5, R> Flow<T1>.combineLatest(
other: Flow<T2>,
other2: Flow<T3>,
other3: Flow<T4>,
other4: Flow<T5>,
crossinline transform: suspend (T1, T2, T3, T4, T5) -> R
transform: suspend (T1, T2, T3, T4, T5) -> R
): Flow<R> = combine(this, other, other2, other3, other4, transform)

/**
Expand Down Expand Up @@ -482,4 +482,4 @@ public fun <T> Flow<T>.replay(bufferSize: Int): Flow<T> = noImpl()
message = "Flow analogue of 'cache()' is 'shareIn' with unlimited replay and 'started = SharingStared.Lazily' argument'",
replaceWith = ReplaceWith("this.shareIn(scope, Int.MAX_VALUE, started = SharingStared.Lazily)")
)
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
public fun <T> Flow<T>.cache(): Flow<T> = noImpl()
26 changes: 20 additions & 6 deletions kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,33 @@ private class UndispatchedContextCollector<T>(
private val emitRef: suspend (T) -> Unit = { downstream.emit(it) } // allocate suspend function ref once on creation

override suspend fun emit(value: T): Unit =
withContextUndispatched(emitContext, countOrElement, emitRef, value)
withContextUndispatched(emitContext, value, countOrElement, emitRef)
}

// Efficiently computes block(value) in the newContext
private suspend fun <T, V> withContextUndispatched(
internal suspend fun <T, V> withContextUndispatched(
newContext: CoroutineContext,
value: V,
countOrElement: Any = threadContextElements(newContext), // can be precomputed for speed
block: suspend (V) -> T, value: V
block: suspend (V) -> T
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
withCoroutineContext(newContext, countOrElement) {
block.startCoroutineUninterceptedOrReturn(value, Continuation(newContext) {
uCont.resumeWith(it)
})
block.startCoroutineUninterceptedOrReturn(value, StackFrameContinuation(uCont, newContext))
}
}

// Continuation that links the caller with uCont with walkable CoroutineStackFrame
private class StackFrameContinuation<T>(
private val uCont: Continuation<T>, override val context: CoroutineContext
) : Continuation<T>, CoroutineStackFrame {

override val callerFrame: CoroutineStackFrame?
get() = uCont as? CoroutineStackFrame

override fun resumeWith(result: Result<T>) {
uCont.resumeWith(result)
}

override fun getStackTraceElement(): StackTraceElement? = null
}
Loading