From 0f1f29e46f8421aaa491d5ba88d320d43eec5b36 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Tue, 23 Jul 2024 20:52:35 +0200 Subject: [PATCH] Fix Redis Pub/Sub subscribeAsMessages method The method was discarding the received items, instead of emitting them downstream. (cherry picked from commit 3743f32b90a2ebdac04d22ff8e2393a323cff45f) --- .../ReactivePubSubCommandsImpl.java | 12 ++++--- .../redis/datasource/PubSubCommandsTest.java | 31 ++++++++++++++++++- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java index 10a92a867921d..f4125fe4c85c0 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java @@ -260,10 +260,14 @@ public Multi> subscribeAsMessages(String... channels) { List list = List.of(channels); return Multi.createFrom().emitter(emitter -> { - subscribe(list, (channel, value) -> new DefaultRedisPubSubMessage<>(value, channel), emitter::complete, - emitter::fail) - .subscribe().with(subscriber -> emitter - .onTermination(() -> subscriber.unsubscribe(channels).subscribe().asCompletionStage())); + subscribe(list, + (channel, value) -> emitter.emit(new DefaultRedisPubSubMessage<>(value, channel)), + emitter::complete, emitter::fail) + .subscribe().with(x -> { + emitter.onTermination(() -> { + x.unsubscribe(channels).subscribe().asCompletionStage(); + }); + }, emitter::fail); }); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java index 38de5c949ef73..e568395493858 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java @@ -40,7 +40,7 @@ void initialize() { ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5)); pubsub = ds.pubsub(Person.class); - ReactiveRedisDataSourceImpl reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api); + var reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api); reactive = reactiveDS.pubsub(Person.class); } @@ -371,6 +371,35 @@ void subscribeToSingleWithMultiAsMessages() { } + @Test + void testSubscribeAsMessages() { + List> people = new CopyOnWriteArrayList<>(); + Multi> multi = reactive.subscribeAsMessages(channel); + + Cancellable cancellable = multi.subscribe().with(people::add); + + pubsub.publish("foo", new Person("luke", "skywalker")); + pubsub.publish(channel, new Person("luke", "skywalker")); + + Awaitility.await().until(() -> people.size() == 1); + + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish(channel, new Person("leia", "skywalker")); + pubsub.publish(channel, new Person("leia", "skywalker")); + + Awaitility.await().until(() -> people.size() == 4); + + assertThat(people).allSatisfy(m -> { + assertThat(m.getChannel()).isNotBlank(); + assertThat(m.getPayload()).isNotNull(); + }); + + cancellable.cancel(); + + awaitNoMoreActiveChannels(); + + } + @Test void unsubscribe() {