Skip to content

Commit

Permalink
Cancel subscription for MPS.subscribeToPublisher
Browse files Browse the repository at this point in the history
The `Flux.takeWhile()` only works if there is data in the `Publisher`
to consume.
We still need to be able to cancel subscription and stop producing even if
there is no data at the moment.

* Change `takeWhile()` to the `doOnSubscribe()` and store `subscription`
in the `volatile` property of the `MessageProducerSupport`
* Cancel such a subscription in the `doStop()` impl
* Propagate `doStop()` to super in the `ZeroMqMessageProducer`
which is only one reactive channel adapter overriding `doStop()`
* Verify in the `ReactiveMessageProducerTests` that subscription is cancelled
for delayed data in the `Publisher`

**Cherry-pick to `5.5.x`**
  • Loading branch information
artembilan authored and garyrussell committed Mar 16, 2022
1 parent 0550380 commit e434a46
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.integration.endpoint;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.SmartInitializingSingleton;
Expand Down Expand Up @@ -66,6 +67,8 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements

private boolean shouldTrack = false;

private volatile Subscription subscription;

protected MessageProducerSupport() {
this.setPhase(Integer.MAX_VALUE / 2);
}
Expand Down Expand Up @@ -206,6 +209,11 @@ protected void doStart() {
*/
@Override
protected void doStop() {
Subscription subscriptionToCancel = this.subscription;
if (subscriptionToCancel != null) {
this.subscription = null;
subscriptionToCancel.cancel();
}
}

protected void sendMessage(Message<?> messageArg) {
Expand All @@ -232,7 +240,7 @@ protected void subscribeToPublisher(Publisher<? extends Message<?>> publisher) {
.map(this::trackMessageIfAny)
.doOnComplete(this::stop)
.doOnCancel(this::stop)
.takeWhile((message) -> isActive());
.doOnSubscribe((subscription) -> this.subscription = subscription);

if (channelForSubscription instanceof ReactiveStreamsSubscribableChannel) {
((ReactiveStreamsSubscribableChannel) channelForSubscription).subscribeTo(messageFlux);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,14 +16,19 @@

package org.springframework.integration.endpoint;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.FluxMessageChannel;
import org.springframework.integration.channel.NullChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
Expand All @@ -49,12 +54,36 @@ public class ReactiveMessageProducerTests {
public MessageProducerSupport producer;

@Test
public void test() {
public void testEmptyPublisherUnsubscription() throws InterruptedException {
CountDownLatch cancelLatch = new CountDownLatch(1);
MessageProducerSupport producer =
new MessageProducerSupport() {

@Override
protected void doStart() {
super.doStart();
subscribeToPublisher(
Flux.just("test1")
.delayElements(Duration.ofSeconds(10))
.map(GenericMessage::new)
.doOnCancel(cancelLatch::countDown));
}

};
producer.setOutputChannel(new NullChannel());
producer.start();
producer.stop();

assertThat(cancelLatch.await(10, TimeUnit.SECONDS)).isTrue();
}

@Test
public void testReactiveMessageProducerFromContext() {
StepVerifier stepVerifier =
StepVerifier.create(
Flux.from(this.fluxMessageChannel)
.map(Message::getPayload)
.cast(String.class))
Flux.from(this.fluxMessageChannel)
.map(Message::getPayload)
.cast(String.class))
.expectNext("test1", "test2")
.thenCancel()
.verifyLater();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -292,6 +292,7 @@ private Mono<Message<?>> convertMessage(Mono<ZMsg> msgMono) {

@Override
protected void doStop() {
super.doStop();
this.socketMono.doOnNext(ZMQ.Socket::close).subscribe();
}

Expand Down

0 comments on commit e434a46

Please sign in to comment.