Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TcpConnectionInterceptorSupport.addNewConnection() is no longer called #3713

Closed
dcitron opened this issue Feb 1, 2022 · 5 comments · Fixed by #3714
Closed

TcpConnectionInterceptorSupport.addNewConnection() is no longer called #3713

dcitron opened this issue Feb 1, 2022 · 5 comments · Fixed by #3714
Assignees
Milestone

Comments

@dcitron
Copy link

dcitron commented Feb 1, 2022

In what version(s) of Spring Integration are you seeing this issue?

5.5.8

Describe the bug

In 5.2.8.RELEASE, the following code worked:

@Configuration
@IntegrationComponentScan
@EnableIntegration
public class MyServer {
    @Bean
    public AbstractServerConnectionFactory serverConnectionFactory() {
        TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(listenPort);

        TcpConnectionInterceptorFactoryChain interceptorFactoryChain = new TcpConnectionInterceptorFactoryChain();
        interceptorFactoryChain.setInterceptor(new MyInterceptorFactory());
        connectionFactory.setInterceptorFactoryChain(interceptorFactoryChain);

        return connectionFactory;
    }

    @ServiceActivator(inputChannel = "myOutput")
    @Bean
    public TcpSendingMessageHandler outbound(AbstractServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
        tcpSendingMessageHandler.setConnectionFactory(connectionFactory);
        return tcpSendingMessageHandler;
    }

    // ... etc ...

    private class MyInterceptorFactory implements TcpConnectionInterceptorFactory {
        @Override
        public TcpConnectionInterceptorSupport getInterceptor() {
            return new MyInterceptor();
        }
    }

    private class MyInterceptor extends TcpConnectionInterceptorSupport {
        @Override
        public void addNewConnection(TcpConnection connection) {
            super.addNewConnection(connection); // <=== I was called before but I'm not now!!!! ===
        }
   }
}

However, in 5.5.8 -- and I assume ever since this change -- the addNewConnection() method above is no longer called.

This is because the old code called org.springframework.integration.ip.tcp.connection.TcpConnectionSupport.registerSender(TcpSender) with a MyInterceptor instance. This called this.theConnection.registerSender(this) which eventually called addNewConnection() on the interceptor.

But the new code calls org.springframework.integration.ip.tcp.connection.TcpConnectionSupport.registerSenders(List). It then walks through the list of senders and calls addNewConnection() on them directly, and those senders are the TcpSendingMessageHandler Bean(s) that were returned from the outbound ServiceActivator(s) above.

Let me know if this is clear, or, if not, if I should make working and non-working samples.

Or, alternately, let me know if I'm misreading the documentation and am setting up interceptors wrong!

Thank you so much!
Dave

@dcitron dcitron added status: waiting-for-triage The issue need to be evaluated and its future decided type: bug labels Feb 1, 2022
@garyrussell garyrussell removed the status: waiting-for-triage The issue need to be evaluated and its future decided label Feb 1, 2022
@garyrussell garyrussell self-assigned this Feb 1, 2022
@garyrussell
Copy link
Contributor

The workaround I posted is not valid; investigating...

@garyrussell
Copy link
Contributor

This is a behavior change; here is a work around in the context of one of the test cases:

	private void senderCalledForDeadConnectionClient(AbstractClientConnectionFactory client) throws InterruptedException {
		CountDownLatch adds = new CountDownLatch(2);
		CountDownLatch removes = new CountDownLatch(2);
		CountDownLatch interceptorAddCalled = new CountDownLatch(1);
		TcpConnectionInterceptorFactoryChain chain = new TcpConnectionInterceptorFactoryChain();
		chain.setInterceptor(new HelloWorldInterceptorFactory() {

			@Override
			public TcpConnectionInterceptorSupport getInterceptor() {
				return new TcpConnectionInterceptorSupport() {

					private List<TcpSender> interceptedSenders;

					private boolean realSender;

					@Override
					public void addNewConnection(TcpConnection connection) {
						interceptorAddCalled.countDown();
						this.interceptedSenders.forEach(sender -> sender.addNewConnection(connection));
					}

					@Override
					public void removeDeadConnection(TcpConnection connection) {
						this.interceptedSenders.forEach(sender -> sender.removeDeadConnection(connection));
					}

					@Override
					public void registerSenders(List<TcpSender> sendersToRegister) {
						this.interceptedSenders = sendersToRegister;
						if (sendersToRegister.size() > 0) {
							if (!(sendersToRegister.get(0) instanceof TcpConnectionInterceptorSupport)) {
								this.realSender = true;
							}
							else {
								this.realSender = ((TcpConnectionInterceptorSupport) this.interceptedSenders.get(0))
										.hasRealSender();
							}
						}
						super.registerSenders(Collections.singletonList(this));
					}

					@Override
					protected boolean hasRealSender() {
						return this.realSender;
					}

				};
			}

		});
		client.setInterceptorFactoryChain(chain);
		client.registerSender(new TcpSender() {

			@Override
			public void addNewConnection(TcpConnection connection) {
				adds.countDown();
			}

			@Override
			public void removeDeadConnection(TcpConnection connection) {
				removes.countDown();
			}

		});
		client.setSingleUse(true);
		client.afterPropertiesSet();
		client.start();
		TcpConnectionSupport conn = client.getConnection();
		assertThat(((TcpConnectionInterceptorSupport) conn).hasRealSender()).isTrue();
		conn.close();
		conn = client.getConnection();
		assertThat(adds.await(10, TimeUnit.SECONDS)).isTrue();
		conn.close();
		client.stop();
		assertThat(removes.await(10, TimeUnit.SECONDS)).isTrue();
		assertThat(interceptorAddCalled.await(10, TimeUnit.SECONDS)).isTrue();
	}

@dcitron
Copy link
Author

dcitron commented Feb 1, 2022

That seems quite confusing. So I need to do connectionFactory.registerListener() on my server-side implementation, and intercept the addNewConnection call there, perhaps? Is that the symmetrical version of what you pasted?

I hate to say it, but I think I'm going to stick with 5.3.10.RELEASE for now (the last release where the Interceptor actually intercepted things) and maybe I'll follow up later to see if things get changed back.

Would you assume that my code in my original post will never work again, post-5.3?

Thanks!

@garyrussell garyrussell added this to the 5.5.9 milestone Feb 1, 2022
@garyrussell
Copy link
Contributor

Yes, this will be fixed in 5.5.9.

@dcitron
Copy link
Author

dcitron commented Feb 1, 2022

Oh, fabulous! Definitely the best answer I could've hoped for! Thank you!

@artembilan artembilan modified the milestones: 5.5.9, 6.0 M2 Feb 1, 2022
garyrussell added a commit to garyrussell/spring-integration that referenced this issue Feb 1, 2022
Resolves spring-projects#3713

spring-projects#3326 added support
for multiple `TcpSenders`. However, when connections are intercepted, the sender
list was not properly chained through the interceptors.

- override `registerSenders` and properly capture the real senders in the last
  interceptor and intermediate interceptors
- this ensures that `addNewConnection` is called on each interceptor
- when removing dead connections, use the (possibly intercepted) connection sender
  list insted of the factory's raw sender list.

**cherry-pick to 5.5.x**
garyrussell added a commit to garyrussell/spring-integration that referenced this issue Feb 2, 2022
Resolves spring-projects#3713

spring-projects#3326 added support
for multiple `TcpSenders`. However, when connections are intercepted, the sender
list was not properly chained through the interceptors.

- override `registerSenders` and properly capture the real senders in the last
  interceptor and intermediate interceptors
- this ensures that `addNewConnection` is called on each interceptor
- when removing dead connections, use the connection sender list insted of the
  factory's raw sender list; detect if the connection is an interceptor and
  call its remove method instead.

**cherry-pick to 5.5.x**
artembilan pushed a commit that referenced this issue Feb 2, 2022
Resolves #3713

#3326 added support
for multiple `TcpSenders`. However, when connections are intercepted, the sender
list was not properly chained through the interceptors.

- override `registerSenders` and properly capture the real senders in the last
  interceptor and intermediate interceptors
- this ensures that `addNewConnection` is called on each interceptor
- when removing dead connections, use the connection sender list insted of the
  factory's raw sender list; detect if the connection is an interceptor and
  call its remove method instead.

**cherry-pick to 5.5.x**
artembilan pushed a commit that referenced this issue Feb 2, 2022
Resolves #3713

#3326 added support
for multiple `TcpSenders`. However, when connections are intercepted, the sender
list was not properly chained through the interceptors.

- override `registerSenders` and properly capture the real senders in the last
  interceptor and intermediate interceptors
- this ensures that `addNewConnection` is called on each interceptor
- when removing dead connections, use the connection sender list insted of the
  factory's raw sender list; detect if the connection is an interceptor and
  call its remove method instead.

**cherry-pick to 5.5.x**
garyrussell added a commit that referenced this issue Feb 3, 2022
garyrussell added a commit that referenced this issue Feb 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants