From e434a4625c0cf53922998257f0c3af0000f892dd Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 15 Mar 2022 13:31:45 -0400 Subject: [PATCH] Cancel subscription for MPS.subscribeToPublisher The `Flux.takeWhile()` only works if there is data in the `Publisher` to consume. We still need to be able to cancel subscription and stop producing even if there is no data at the moment. * Change `takeWhile()` to the `doOnSubscribe()` and store `subscription` in the `volatile` property of the `MessageProducerSupport` * Cancel such a subscription in the `doStop()` impl * Propagate `doStop()` to super in the `ZeroMqMessageProducer` which is only one reactive channel adapter overriding `doStop()` * Verify in the `ReactiveMessageProducerTests` that subscription is cancelled for delayed data in the `Publisher` **Cherry-pick to `5.5.x`** --- .../endpoint/MessageProducerSupport.java | 10 ++++- .../ReactiveMessageProducerTests.java | 39 ++++++++++++++++--- .../zeromq/inbound/ZeroMqMessageProducer.java | 3 +- 3 files changed, 45 insertions(+), 7 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index eb3502e24b9..cd937eed2ae 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -17,6 +17,7 @@ package org.springframework.integration.endpoint; import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.SmartInitializingSingleton; @@ -66,6 +67,8 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements private boolean shouldTrack = false; + private volatile Subscription subscription; + protected MessageProducerSupport() { this.setPhase(Integer.MAX_VALUE / 2); } @@ -206,6 +209,11 @@ protected void doStart() { */ @Override protected void doStop() { + Subscription subscriptionToCancel = this.subscription; + if (subscriptionToCancel != null) { + this.subscription = null; + subscriptionToCancel.cancel(); + } } protected void sendMessage(Message messageArg) { @@ -232,7 +240,7 @@ protected void subscribeToPublisher(Publisher> publisher) { .map(this::trackMessageIfAny) .doOnComplete(this::stop) .doOnCancel(this::stop) - .takeWhile((message) -> isActive()); + .doOnSubscribe((subscription) -> this.subscription = subscription); if (channelForSubscription instanceof ReactiveStreamsSubscribableChannel) { ((ReactiveStreamsSubscribableChannel) channelForSubscription).subscribeTo(messageFlux); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java index f9d99f28ab2..edb2b12deb1 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveMessageProducerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,7 +16,11 @@ package org.springframework.integration.endpoint; +import static org.assertj.core.api.Assertions.assertThat; + import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -24,6 +28,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.channel.NullChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; @@ -49,12 +54,36 @@ public class ReactiveMessageProducerTests { public MessageProducerSupport producer; @Test - public void test() { + public void testEmptyPublisherUnsubscription() throws InterruptedException { + CountDownLatch cancelLatch = new CountDownLatch(1); + MessageProducerSupport producer = + new MessageProducerSupport() { + + @Override + protected void doStart() { + super.doStart(); + subscribeToPublisher( + Flux.just("test1") + .delayElements(Duration.ofSeconds(10)) + .map(GenericMessage::new) + .doOnCancel(cancelLatch::countDown)); + } + + }; + producer.setOutputChannel(new NullChannel()); + producer.start(); + producer.stop(); + + assertThat(cancelLatch.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + public void testReactiveMessageProducerFromContext() { StepVerifier stepVerifier = StepVerifier.create( - Flux.from(this.fluxMessageChannel) - .map(Message::getPayload) - .cast(String.class)) + Flux.from(this.fluxMessageChannel) + .map(Message::getPayload) + .cast(String.class)) .expectNext("test1", "test2") .thenCancel() .verifyLater(); diff --git a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java index 0263496ce7c..ae2b9af006b 100644 --- a/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java +++ b/spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducer.java @@ -1,5 +1,5 @@ /* - * Copyright 2020 the original author or authors. + * Copyright 2020-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -292,6 +292,7 @@ private Mono> convertMessage(Mono msgMono) { @Override protected void doStop() { + super.doStop(); this.socketMono.doOnNext(ZMQ.Socket::close).subscribe(); }