Skip to content

Commit

Permalink
Fix Redis Pub/Sub subscribeAsMessages method
Browse files Browse the repository at this point in the history
The method was discarding the received items, instead of emitting them downstream.
  • Loading branch information
cescoffier authored and holly-cummins committed Jul 31, 2024
1 parent 90344cf commit ca73167
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,14 @@ public Multi<RedisPubSubMessage<V>> subscribeAsMessages(String... channels) {

List<String> list = List.of(channels);
return Multi.createFrom().emitter(emitter -> {
subscribe(list, (channel, value) -> new DefaultRedisPubSubMessage<>(value, channel), emitter::complete,
emitter::fail)
.subscribe().with(subscriber -> emitter
.onTermination(() -> subscriber.unsubscribe(channels).subscribe().asCompletionStage()));
subscribe(list,
(channel, value) -> emitter.emit(new DefaultRedisPubSubMessage<>(value, channel)),
emitter::complete, emitter::fail)
.subscribe().with(x -> {
emitter.onTermination(() -> {
x.unsubscribe(channels).subscribe().asCompletionStage();
});
}, emitter::fail);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void initialize() {
ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5));
pubsub = ds.pubsub(Person.class);

ReactiveRedisDataSourceImpl reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api);
var reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api);
reactive = reactiveDS.pubsub(Person.class);
}

Expand Down Expand Up @@ -371,6 +371,35 @@ void subscribeToSingleWithMultiAsMessages() {

}

@Test
void testSubscribeAsMessages() {
List<RedisPubSubMessage<Person>> people = new CopyOnWriteArrayList<>();
Multi<RedisPubSubMessage<Person>> multi = reactive.subscribeAsMessages(channel);

Cancellable cancellable = multi.subscribe().with(people::add);

pubsub.publish("foo", new Person("luke", "skywalker"));
pubsub.publish(channel, new Person("luke", "skywalker"));

Awaitility.await().until(() -> people.size() == 1);

pubsub.publish(channel, new Person("leia", "skywalker"));
pubsub.publish(channel, new Person("leia", "skywalker"));
pubsub.publish(channel, new Person("leia", "skywalker"));

Awaitility.await().until(() -> people.size() == 4);

assertThat(people).allSatisfy(m -> {
assertThat(m.getChannel()).isNotBlank();
assertThat(m.getPayload()).isNotNull();
});

cancellable.cancel();

awaitNoMoreActiveChannels();

}

@Test
void unsubscribe() {

Expand Down

0 comments on commit ca73167

Please sign in to comment.