Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expiring completed aggregator groups on startup fails as output channels haven't been created yet. #9521

Closed
mitchmcd18 opened this issue Sep 29, 2024 · 4 comments

Comments

@mitchmcd18
Copy link
Contributor

In what version(s) of Spring Integration are you seeing this issue?

6.3.4

Describe the bug

When an aggregator starts up it will immediately attempt to purge message groups that have expired if the timeout is a non-zero positive integer https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandler.java#L987.

When using the java DSL, we can construct routes like flow.aggregate().handle().channel() where it appears the intermediate channels get constructed in the order they are defined, however since the aggregator handler immediately expires the groups on startup before the next part in the flow is initialised, it will fail with the following error.

Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-6.3.4.jar:6.3.4]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-6.3.4.jar:6.3.4]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.3.4.jar:6.3.4]
        ... 35 common frames omitted

This seems to only occur if the group that was expired is actually completed. In this case rather than going to the discardChannel (which would normally be made as a separate bean somewhere else) it continues onto the next part of the flow.

To Reproduce

Using the Java DSL, create an aggregator route with a handler after it. Associate the aggregator with a message store that already contains a group that is completed so that on startup the aggregator will immediately complete the group.

Expected behavior

The already-completed group that expired should be able to complete as normal rather than throwing an error.

It may be sufficient to just remove the immediate call to purgeOrphanedGroups() and perhaps put an initial delay or some sort, or wait until the entire flow has been constructed before processing messages

Sample

https://github.com/mitchmcd18/spring-completegroup-bug

@mitchmcd18 mitchmcd18 added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Sep 29, 2024
@artembilan
Copy link
Member

The .channel(new DirectChannel()) is not OK in the end. This kind of channel has to have a subscriber to complete the flow. Consider to add handle(m -> System.out.println(m)) after that.

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Sep 30, 2024
@mitchmcd18
Copy link
Contributor Author

Ah yup, have updated the example to use handle(m -> System.out.println(m)). The issue still occurs with this

@artembilan
Copy link
Member

OK. That is a race condition between endpoint startups.
Both aggregator and and next handle() are in the same startup phase, but since an aggregator is registered first, it is is really started fist in that startup group.
So, the workaround for now is like adding .phase(Integer.MIN_VALUE + 1) into your aggregate() definition.

I think I'll make such a fix in the framework to ensure that intermediate endpoints are started later than those ending the flow.
Should be easy to do here:

	public EventDrivenConsumer(SubscribableChannel inputChannel, MessageHandler handler) {
		Assert.notNull(inputChannel, "inputChannel must not be null");
		Assert.notNull(handler, "handler must not be null");
		this.inputChannel = inputChannel;
		this.handler = handler;
		this.setPhase(Integer.MIN_VALUE);
	}

Check for the AbstractMessageProducingHandler type, which is indeed enough for us to determine that we are in the middle of the flow with such a handler.
Meanwhile the plain MessageHandler is really one-one and can be used only in the end of flow.
So, this one should start first and then the rest of the flow upstream.

@artembilan artembilan added this to the 6.4.0-RC1 milestone Sep 30, 2024
@artembilan artembilan added in: core and removed status: waiting-for-reporter Needs a feedback from the reporter labels Sep 30, 2024
@artembilan
Copy link
Member

Unfortunately, that is going to be a slight change in the behavior, therefore we won't back-port it down to versions in the maintenance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants