Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message publishing via Sink and OutputDestination times out with Spring Integration 6.3.2 #9362

Closed
wrwksexahatvani opened this issue Jul 25, 2024 · 3 comments
Labels
status: declined There won't be a fix for some reason

Comments

@wrwksexahatvani
Copy link

In what version(s) of Spring Integration are you seeing this issue?

6.3.2.RELEASE

Describe the bug

The DummyEventProducerIntegrationTestt in the sample project https://github.com/andrashatvani/spring-demo/ times out once I upgrade Spring Boot to 3.3.2 in the POM. It works with Spring Boot 3.3.1 with Spring Integration 6.3.1 as it has worked so far always. @wilkinsona guided me in the direction of Spring Integration in spring-projects/spring-boot#41602.

To Reproduce

Run DummyEventProducerIntegrationTest.

Expected behavior

The test should pass.

Sample

https://github.com/andrashatvani/spring-demo/

@wrwksexahatvani wrwksexahatvani added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Jul 25, 2024
@artembilan
Copy link
Member

Apparently the fix in Spring integration 6.3.2 has just revealed some problem in Spring Cloud Stream.
There is a login in the FunctionConfiguration:

	/*
	 * Creates a publishing trigger to ensure Supplier does not begin publishing until binding is created
	 */
	private Publisher<Object> setupBindingTrigger(GenericApplicationContext context) {
		AtomicReference<MonoSink<Object>> triggerRef = new AtomicReference<>();
		Publisher<Object> beginPublishingTrigger = Mono.create(triggerRef::set);
		context.addApplicationListener(event -> {
			if (event instanceof BindingCreatedEvent) {
				if (triggerRef.get() != null) {
					triggerRef.get().success();
				}
			}
		});
		return beginPublishingTrigger;
	}

What I see in the debug session, when that BindingCreatedEvent arrives, there is no triggerRef.get() yet.
This one can be populated there only when we subscriber to that beginPublishingTrigger.
However, the FluxMessageChannel does not subscribe to the provided Publisher until it has its own subscribers:

	@Override
	public void subscribeTo(Publisher<? extends Message<?>> publisher) {
		Flux<Object> upstreamPublisher =
				Flux.from(publisher)
						.delaySubscription(
								Mono.fromCallable(this.sink::currentSubscriberCount)
										.filter((value) -> value > 0)
										.repeatWhenEmpty((repeat) ->
												this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat))
						.flatMap((message) ->
								Mono.just(message)
										.handle((messageToHandle, syncSink) -> sendReactiveMessage(messageToHandle))
										.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)))
						.contextCapture();

		addPublisherToSubscribe(upstreamPublisher);
	}

And that is a race condition of the application state when this FluxMessageChannel does not have consumers yet, but BindingCreatedEvent has been emitted already.
The logic is totally OK to emit that event and have consumers subscribed for the channel independently.
Only the problem that trigger Mono is not populated until we subscribe to it.
And that leads to the lost emission from the source Supplier<Flux>.

I'm thinking about reworking that setupBindingTrigger to not wait for Mono subscription, but rather just expose Sinks.One as a trigger for the event.

@artembilan
Copy link
Member

Please, raise an issue on Spring Cloud Stream side and I'll provide PR shortly after I come up with the unit test to at least cover the functionality.
The fix on Spring Integration side was this: bdcb856.
And apparently before that there was some hidden behavior which allowed reactive subscription to be pushed up.
Right now I'm bumping to your another issue after the fix: spring-cloud/spring-cloud-stream#2977.
But that what we will work later on.

Closing this one as Works as Designed.

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Jul 25, 2024
@artembilan artembilan added status: declined There won't be a fix for some reason and removed type: bug status: waiting-for-triage The issue need to be evaluated and its future decided labels Jul 25, 2024
artembilan added a commit to artembilan/spring-cloud-stream that referenced this issue Jul 26, 2024
Related to: spring-projects/spring-integration#9362

After the fix in Spring Integration: spring-projects/spring-integration@bdcb856
we ended up in a deadlock situation with a `beginPublishingTrigger` in the `FunctionConfiguration`
used for the `delaySubscription()` on an original `Publisher`.
The `FluxMessageChannel` uses its own `delaySubscription()` until the channel has its subscribers.
Apparently the logic before was with errors, so the `FluxMessageChannel` was marked as active
even if its subscriber is not ready yet, leading to famous `Dispatcher does not have subscribers` error.
So, looks like this `beginPublishingTrigger` was introduced back in days in Spring Cloud Stream
to mitigate that situation until we really emit a `BindingCreatedEvent`.

The deadlock (and the flaw, respectively) is with the `setupBindingTrigger()` method implementation
where `FluxMessageChannel` now "really" delays a subscription to the provided `Publisher`,
therefore not triggering that `Mono.create()` fulfilment immediately.
The `BindingCreatedEvent` arrives earlier, than we have a subscriber on the channel,
but `triggerRef.get()` is `null`, so we don't `success()` it and in the end don't subscribe
to an original `Publisher` since `delaySubscription()` on it is never completed.

Since `FunctionConfiguration` fully relies on `IntegrationFlow.from(Publisher)`,
which ends up with the mentioned  `FluxMessageChannel.subscribeTo()` and its own `delaySubscription()`
(which, in turn, apparently fixed now), we don't need our own `delaySubscription()` any more.
Therefore the fix in this PR is to propose to remove `beginPublishingTrigger` logic altogether.
@artembilan
Copy link
Member

FYI, @wrwksexahatvani , I've raise Pull Request in Spring Cloud Stream: spring-cloud/spring-cloud-stream#2978

No need in the separate issue over there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: declined There won't be a fix for some reason
Projects
None yet
Development

No branches or pull requests

2 participants