From 1a6af72e8bc6a97aa74c332d6c0921ee2bc18901 Mon Sep 17 00:00:00 2001 From: sokomishalov Date: Fri, 18 Dec 2020 16:33:24 +0300 Subject: [PATCH] bump reactor-core to 3.4.x version and replace deprecated API usage --- gradle.properties | 2 +- reactive/kotlinx-coroutines-reactor/build.gradle.kts | 4 ++++ .../kotlinx-coroutines-reactor/src/ReactorContext.kt | 4 ++-- .../src/ReactorContextInjector.kt | 4 ++-- .../kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt | 8 ++++---- reactive/kotlinx-coroutines-reactor/test/MonoTest.kt | 2 +- .../test/ReactorContextTest.kt | 10 +++++----- 7 files changed, 19 insertions(+), 15 deletions(-) diff --git a/gradle.properties b/gradle.properties index 9163cf5af1..5d84f59544 100644 --- a/gradle.properties +++ b/gradle.properties @@ -15,7 +15,7 @@ html_version=0.6.8 lincheck_version=2.7.1 dokka_version=0.9.16-rdev-2-mpp-hacks byte_buddy_version=1.10.9 -reactor_version=3.2.5.RELEASE +reactor_version=3.4.1 reactive_streams_version=1.0.2 rxjava2_version=2.2.8 rxjava3_version=3.0.2 diff --git a/reactive/kotlinx-coroutines-reactor/build.gradle.kts b/reactive/kotlinx-coroutines-reactor/build.gradle.kts index d5fd208a27..1725af9068 100644 --- a/reactive/kotlinx-coroutines-reactor/build.gradle.kts +++ b/reactive/kotlinx-coroutines-reactor/build.gradle.kts @@ -9,6 +9,10 @@ dependencies { compile(project(":kotlinx-coroutines-reactive")) } +java { + targetCompatibility = JavaVersion.VERSION_1_8 + sourceCompatibility = JavaVersion.VERSION_1_8 +} tasks { compileKotlin { diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 69467ad8b1..c21e217034 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -24,7 +24,7 @@ import kotlinx.coroutines.reactive.* * #### Propagating ReactorContext to Reactor's Context * ``` * val flux = myDatabaseService.getUsers() - * .subscriberContext() { ctx -> println(ctx); ctx } + * .contextWrite { ctx -> println(ctx); ctx } * flux.await() // Will print "null" * * // Now add ReactorContext @@ -43,7 +43,7 @@ import kotlinx.coroutines.reactive.* * .subscribe() // Will print 'Reactor context in Flow: null' * // Add subscriber's context * flow.asFlux() - * .subscriberContext { ctx -> ctx.put("answer", 42) } + * .contextWrite { ctx -> ctx.put("answer", 42) } * .subscribe() // Will print "Reactor context in Flow: Context{'answer'=42}" * ``` */ diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt index a9d140a9fd..44292b88ec 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContextInjector.kt @@ -18,8 +18,8 @@ internal class ReactorContextInjector : ContextInjector { override fun injectCoroutineContext(publisher: Publisher, coroutineContext: CoroutineContext): Publisher { val reactorContext = coroutineContext[ReactorContext]?.context ?: return publisher return when(publisher) { - is Mono -> publisher.subscriberContext(reactorContext) - is Flux -> publisher.subscriberContext(reactorContext) + is Mono -> publisher.contextWrite(reactorContext) + is Flux -> publisher.contextWrite(reactorContext) else -> publisher } } diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt index cecc89592e..dbe97b17d8 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt @@ -18,8 +18,8 @@ class FlowAsFluxTest : TestBase() { (1..4).forEach { i -> emit(createMono(i).awaitFirst()) } } .asFlux() - .subscriberContext(Context.of(1, "1")) - .subscriberContext(Context.of(2, "2", 3, "3", 4, "4")) + .contextWrite(Context.of(1, "1")) + .contextWrite(Context.of(2, "2", 3, "3", 4, "4")) val list = flux.collectList().block()!! assertEquals(listOf("1", "2", "3", "4"), list) } @@ -36,7 +36,7 @@ class FlowAsFluxTest : TestBase() { it.next("OK") it.complete() } - .subscriberContext { ctx -> + .contextWrite { ctx -> expect(2) assertEquals("CTX", ctx.get(1)) ctx @@ -58,7 +58,7 @@ class FlowAsFluxTest : TestBase() { it.next("OK") it.complete() } - .subscriberContext { ctx -> + .contextWrite { ctx -> expect(2) assertEquals("CTX", ctx.get(1)) ctx diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index 551988b814..0271483fc1 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -235,7 +235,7 @@ class MonoTest : TestBase() { } finally { throw TestException() // would not be able to handle it since mono is disposed } - }.subscriberContext { Context.of("reactor.onOperatorError.local", handler) } + }.contextWrite { Context.of("reactor.onOperatorError.local", handler) } mono.subscribe(object : Subscriber { override fun onSubscribe(s: Subscription) { expect(2) diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt index 3681261b0c..aff29241c9 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -18,8 +18,8 @@ class ReactorContextTest : TestBase() { buildString { (1..7).forEach { append(ctx.getOrDefault(it, "noValue")) } } - } .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) - .subscriberContext { ctx -> ctx.put(6, "6") } + } .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) + .contextWrite { ctx -> ctx.put(6, "6") } assertEquals(mono.awaitFirst(), "1234567") } @@ -29,8 +29,8 @@ class ReactorContextTest : TestBase() { val ctx = reactorContext() (1..7).forEach { send(ctx.getOrDefault(it, "noValue")) } } - .subscriberContext(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) - .subscriberContext { ctx -> ctx.put(6, "6") } + .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) + .contextWrite { ctx -> ctx.put(6, "6") } val list = flux.collectList().block()!! assertEquals((1..7).map { it.toString() }, list) } @@ -42,7 +42,7 @@ class ReactorContextTest : TestBase() { buildString { (1..3).forEach { append(ctx.getOrDefault(it, "noValue")) } } - } .subscriberContext(Context.of(2, "2")) + } .contextWrite(Context.of(2, "2")) .awaitFirst() assertEquals(result, "123") }