Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] spring-cloud-azure-starter-servicebus-jms - com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose amqp: connection:forced The connection was closed by container because it did not have any active links in the past 300000 milliseconds #41736

Closed
Tom-Van-Asch opened this issue Sep 5, 2024 · 40 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@Tom-Van-Asch
Copy link

Tom-Van-Asch commented Sep 5, 2024

Describe the bug
We are using the ServiceBusSenderClient to send messages to a service bus topic. When there is no message send for 15 minutes we see that the connection is freed but results in een NullPointerException afterwards as a new session is created on the already closed connection.

Exception or Stack Trace

2024-09-02 13:12:13.557 DEBUG [spring-app-name,,] 80959 --- [tem-execution-4] b.d.s.c.j.o.service.OutboxItemProcessor  : Processing outbox item: 4089 with aggregateId: 6049
2024-09-02 13:27:37.216 ERROR [spring-app-name,,] 80959 --- [ctor-executor-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_27dedd_1725282731155], entityName[service-bus-topic-name] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'e571b398fba34f83bfc494365a9a6210_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:e571b398fba34f83bfc494365a9a6210_G4, SystemTracker:gateway10, Timestamp:2024-09-02T13:27:39', info=null}], errorContext[NAMESPACE: service-bus-name.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: ervice-bus-topic-name]
Caused by: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_27dedd_1725282731155], entityName[ervice-bus-topic-name] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'e571b398fba34f83bfc494365a9a6210_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:e571b398fba34f83bfc494365a9a6210_G4, SystemTracker:gateway10, Timestamp:2024-09-02T13:27:39', info=null}], errorContext[NAMESPACE: service-bus-name.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: ervice-bus-topic-name]
	at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxDistinctUntilChanged] :
	reactor.core.publisher.Flux.distinctUntilChanged(Flux.java:4744)
	com.azure.core.amqp.implementation.handler.Handler.getEndpointStates(Handler.java:77)
Error has been observed at the following site(s):
	*__Flux.distinctUntilChanged ⇢ at com.azure.core.amqp.implementation.handler.Handler.getEndpointStates(Handler.java:77)
	|_                  Flux.map ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:141)
	|_            Flux.doOnError ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
	|_         Flux.doOnComplete ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
	|_                Flux.cache ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
	|_                Flux.cache ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
Original Stack Trace:
		at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
		at com.azure.core.amqp.implementation.handler.SessionHandler.onSessionRemoteClose(SessionHandler.java:139)
		at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:152)
		at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
		at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
		at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
		at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
		at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
		at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
		at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
		at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		at java.base/java.lang.Thread.run(Thread.java:840)

2024-09-02 13:27:37.223 ERROR [spring-app-name,,] 80959 --- [ctor-executor-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_27dedd_1725282731155], entityName[cbs-session] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'e571b398fba34f83bfc494365a9a6210_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:e571b398fba34f83bfc494365a9a6210_G4, SystemTracker:gateway10, Timestamp:2024-09-02T13:27:39', info=null}], errorContext[NAMESPACE: service-bus-name.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: cbs-session]
Caused by: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_27dedd_1725282731155], entityName[cbs-session] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'e571b398fba34f83bfc494365a9a6210_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:e571b398fba34f83bfc494365a9a6210_G4, SystemTracker:gateway10, Timestamp:2024-09-02T13:27:39', info=null}], errorContext[NAMESPACE: service-bus-name.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: cbs-session]
	at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxDistinctUntilChanged] :
	reactor.core.publisher.Flux.distinctUntilChanged(Flux.java:4744)
	com.azure.core.amqp.implementation.handler.Handler.getEndpointStates(Handler.java:77)
Error has been observed at the following site(s):
	*__Flux.distinctUntilChanged ⇢ at com.azure.core.amqp.implementation.handler.Handler.getEndpointStates(Handler.java:77)
	|_                  Flux.map ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:141)
	|_            Flux.doOnError ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
	|_         Flux.doOnComplete ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
	|_                Flux.cache ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
	|_                Flux.cache ⇢ at com.azure.core.amqp.implementation.ReactorSession.<init>(ReactorSession.java:148)
Original Stack Trace:
		at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:90)
		at com.azure.core.amqp.implementation.handler.SessionHandler.onSessionRemoteClose(SessionHandler.java:139)
		at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:152)
		at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
		at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
		at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
		at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
		at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
		at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
		at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
		at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		at java.base/java.lang.Thread.run(Thread.java:840)

2024-09-02 13:27:39.023 ERROR [spring-app-name,,] 80959 --- [    parallel-10] c.a.c.a.i.AmqpChannelProcessor           : {"az.sdk.message":"Retry attempts exhausted or exception was not retriable.","exception":"Cannot invoke \"java.util.List.add(Object)\" because \"this._sessions\" is null","connectionId":"MF_27dedd_1725282731155","entityPath":"$cbs","tryCount":1}
2024-09-02 13:27:39.024 ERROR [spring-app-name,,] 80959 --- [    parallel-10] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
Caused by: java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
	at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
	reactor.core.publisher.Mono.map(Mono.java:3482)
	com.azure.core.amqp.implementation.ReactorConnection.createSession(ReactorConnection.java:307)
Error has been observed at the following site(s):
	*_____________Mono.map ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createSession(ReactorConnection.java:307)
	|_        Mono.flatMap ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createSession(ReactorConnection.java:340)
	|_           Mono.cast ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createRequestResponseChannel(ReactorConnection.java:441)
	|_            Mono.map ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createRequestResponseChannel(ReactorConnection.java:442)
	|_       Mono.doOnNext ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createRequestResponseChannel(ReactorConnection.java:446)
	|_         Mono.repeat ⇢ at com.azure.core.amqp.implementation.ReactorConnection.createRequestResponseChannel(ReactorConnection.java:454)
	*_________Mono.flatMap ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:504)
	|_      Mono.doFinally ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:536)
	*__Mono.whenDelayError ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:535)
	*____________Mono.then ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:541)
	*____________Mono.then ⇢ at com.azure.core.amqp.implementation.ReactorConnection.closeAsync(ReactorConnection.java:544)
Original Stack Trace:
		at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)
		at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:39)
		at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$15(ReactorConnection.java:311)
		at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
		at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$16(ReactorConnection.java:308)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
		at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.request(MonoIgnoreThen.java:164)
		at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:139)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.request(MonoFlatMap.java:194)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2331)
		at com.azure.core.amqp.implementation.AmqpChannelProcessor.requestUpstream(AmqpChannelProcessor.java:336)
		at com.azure.core.amqp.implementation.AmqpChannelProcessor.lambda$onError$4(AmqpChannelProcessor.java:230)
		at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171)
		at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
		at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
		at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
		at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
		at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
		at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
		at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
		at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
		at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
		at java.base/java.lang.Thread.run(Thread.java:840)

To Reproduce

  1. Create a serviceBusSenderClient instance
  2. send a message to the service bus topic
  3. Wait 15 minutes and the exception logging occurs

Code Snippet

    public void execute() {
        ServiceBusSenderClient serviceBusSenderClient = serviceBusClientBuilder
                .connectionString("connectionString")
                .sender()
                .topicName("topicName")
                .buildClient();

        ServiceBusMessage serviceBusMessage = new ServiceBusMessage("payload");
        serviceBusMessage.getRawAmqpMessage().getApplicationProperties().put(TYPE_ID, "messageType");
        serviceBusMessage.setMessageId("messageId");

        serviceBusSenderClient.sendMessage(serviceBusMessage);
    }

Expected behavior
No error logging should occur

Screenshots
image
When the doFree method is called on the connection the _sessions variable is set to null but afterwards the connection seems to be reused to create a new session which results in the NullPointerException.

Setup (please complete the following information):

  • OS: Azure Spring Apps / Linux
  • IDE: n/a
  • Library/Libraries: com.azure.spring:spring-cloud-azure-starter-servicebus-jms, we use version 5.13.0
  • Java version: 17
  • App Server/Environment: Apache Tomcat from Spring Boot 3.3
  • Frameworks: Spring Boot 3.3.1
@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus labels Sep 5, 2024
Copy link

github-actions bot commented Sep 5, 2024

@anuchandy @conniey @lmolkova

Copy link

github-actions bot commented Sep 5, 2024

Thank you for your feedback. Tagging and routing to the team member best able to assist.

@anuchandy
Copy link
Member

anuchandy commented Sep 5, 2024

Hello @Tom-Van-Asch, thank you for the report. Since your screenshot shows the breakpoint being hit, are you able to reproduce this locally in your developer machine? If so, could you provide more details

  • About your local setup, including the Java version, operating system, whether you are running app on Docker locally or directly, and the number of cores?
  • The service bus tier, region?

@anuchandy
Copy link
Member

@Tom-Van-Asch, just to clarify, I mean if the local setup also hits the reported "NPE" (in addition to the onSessionRemoteClose and closed-connection error message)

@anuchandy anuchandy added the needs-author-feedback Workflow: More information is needed from author to address the issue. label Sep 6, 2024
Copy link

github-actions bot commented Sep 6, 2024

Hi @Tom-Van-Asch. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

@github-actions github-actions bot removed the needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team label Sep 6, 2024
@anuchandy
Copy link
Member

The same stack trace is discussed, and root caused in this thread #41584

@Tom-Van-Asch
Copy link
Author

Thanks for the update, when the new version is released I'll test it again and update this issue with my findings.

@github-actions github-actions bot added needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team and removed needs-author-feedback Workflow: More information is needed from author to address the issue. labels Sep 11, 2024
@josebarros2025
Copy link

We have the same issue without any possible solution to that

@anuchandy
Copy link
Member

the release PR has been opened and will be shipped soon: #42088

@hylander0
Copy link

the release PR has been opened and will be shipped soon: #42088

Thank you Anu for the update. Looking forward to this update.

@anuchandy
Copy link
Member

anuchandy commented Sep 27, 2024

Hello @Tom-Van-Asch, @pretti-vusion, @amacbean, @josebarros2025, @hylander0, @padmapriyanalam, @pje477,

The library update has been released; please follow the steps outlined below. Let us know if the experience improves. Note that you will still see the session disconnect/reconnect logs (which is expected) but the new library should address the NullPointerException.

Update to 7.17.4 or 7.17.5 dependency

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.17.5</version>
</dependency>

Update the ServiceBusClientBuilder for "com.azure.core.amqp.cache"

When building any client (ServiceBusProcessorClient, ServiceBusReceiverClient, ServiceBusSenderClient etc..) use the configuration ("com.azure.core.amqp.cache"), as shown below. Make sure this configuration is selected for all the places where the application creates a new ServiceBusClientBuilder -

new ServiceBusClientBuilder()
.connectionString(CONNECTION_STRING)
.configuration(new ConfigurationBuilder()
       .putProperty("com.azure.core.amqp.cache", "true")
       .build())
.processor()|sender()|..

Choosing this configuration is important to resolve the problem - java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null

Ensure right transitive dependencies

Make sure the transitive dependencies (azure-core-amqp, azure-core) are resolved to expected versions.

mvn dependency:tree
[INFO] ...
[INFO] +- com.azure:azure-messaging-servicebus:jar:7.17.5:compile
[INFO] |  +- com.azure:azure-core:jar:1.53.0:compile
[INFO] |  |  +- ..
[INFO] |  |  \- ...
[INFO] |  \- com.azure:azure-core-amqp:jar:2.9.10:compile
[INFO] |     +- com.microsoft.azure:qpid-proton-j-extensions:jar:1.2.5:compile
[INFO] |     \- org.apache.qpid:proton-j:jar:0.34.1:compile

Note: In later versions the need for opt-in "com.azure.core.amqp.cache" will be removed

@anuchandy
Copy link
Member

anuchandy commented Sep 28, 2024

Hello @pje477, I'm following up on this comment #41584 (comment) you left in the other GitHub issue.

While onRemoteSessionClose (normal) and NullPointerException (abnormal) had existed in the versions you listed, the "reactor-executor leak" is something new. Could you please try 7.17.4 steps listed in the above comment and check if it resolves the NPE and associated issues?

Few questions about your env where you were observing leak (It's fine to respond later, after trying 7.17.4) -

  1. Can you provide the code showing how your application uses the Service Bus clients - including any configuration values such as concurrency, receive-mode etc. and code doing client close?
  2. What kind of application is it (e.g., Spring, a basic console application)?
  3. How many Service Bus client instances are used in the application?
  4. Is only one topic used in the affected application?
  5. Are you able to reproduce the leak locally by running application in your dev box?
  6. Do you see a correlation between the number of reactor-executor threads and the number of NullPointerException emitted, can you share more of your observation.
  7. What is the Service Bus tier you're using?

@josebarros2025
Copy link

josebarros2025 commented Sep 30, 2024

I'm having issues like these

java.lang.NoSuchMethodError: 'void com.azure.core.amqp.implementation.ReactorConnection.<init>(java.lang.String, com.azure.core.amqp.implementation.ConnectionOptions, com.azure.core.amqp.implementation.ReactorProvider, com.azure.core.amqp.implementation.ReactorHandlerProvider, com.azure.core.amqp.implementation.AmqpLinkProvider, com.azure.core.amqp.implementation.TokenManagerProvider, com.azure.core.amqp.implementation.MessageSerializer, org.apache.qpid.proton.amqp.transport.SenderSettleMode, org.apache.qpid.proton.amqp.transport.ReceiverSettleMode, boolean, boolean)'
any tips?

return new ServiceBusClientBuilder()
           .connectionString(messagingConfiguration.getConnectionString())
           .configuration(new ConfigurationBuilder()
               .putProperty("com.azure.core.amqp.cache", "true")
               .build())
           .processor()
           .queueName(queueName)
           .maxConcurrentCalls(messagingConfiguration.getMaxConcurrentCall())
           .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
           .processMessage(processMessage())
           .processError(processError())
           .disableAutoComplete()
           .buildProcessorClient();

this is spring boot 3.3.3 application

My dependency tree is different than yours, how should I proceed?

[INFO] +- com.azure.spring:spring-cloud-azure-starter:jar:5.14.0:compile
[INFO] |  \- com.azure.spring:spring-cloud-azure-autoconfigure:jar:5.14.0:compile
[INFO] |     \- com.azure.spring:spring-cloud-azure-service:jar:5.14.0:compile
[INFO] |        \- com.azure.spring:spring-cloud-azure-core:jar:5.14.0:compile
[INFO] |           \- com.azure:azure-core-management:jar:1.15.0:compile
[INFO] +- com.azure:azure-messaging-servicebus:jar:7.17.4:compile
[INFO] |  +- com.azure:azure-core:jar:1.49.1:compile
[INFO] |  |  \- com.azure:azure-json:jar:1.1.0:compile
[INFO] |  +- com.azure:azure-xml:jar:1.0.0:compile
[INFO] |  +- com.azure:azure-core-amqp:jar:2.9.6:compile
[INFO] |  |  \- com.microsoft.azure:qpid-proton-j-extensions:jar:1.2.5:compile
[INFO] |  \- com.azure:azure-core-http-netty:jar:1.15.1:compile

@pje477
Copy link

pje477 commented Sep 30, 2024

Hi @anuchandy - Thank you for releasing version 7.17.4 of Service Bus SDK - it appears to have fixed our issue!

Regarding the reactor-executor thread leak, what we observed is that in versions prior to 7.17.4, when the below error was logged for certain service bus namespaces, the reactor-executor thread associated to that connection would not be closed, and the number of reactor-executor threads would increase over time.

This was the error that always preceded the thread leak:
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_ff675e_1727378638933], entityName[sbt-my-topic-name] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container '6b2b3bba82084087bfd7d760339cdade_G0' because it did not have any active links in the past 300000 milliseconds.

We observed this initially as slowly increasing CPU usage of our application, which is a Spring Boot microservice deployed to Azure Spring Apps. We ran a JFR on the Spring Apps instance and observed the high number of zombie reactor-executor threads. Then during troubleshooting we found that only certain service bus namespaces exhibit this behavior. For us, the only namespace that exhibits this behavior is our production instance, and none of our non-prod service bus namespaces exhibit the behavior, despite the SB namespaces being configured identically (via Terraform) and the same exact code being deployed in prod and non-prod.

In any case, here are the answers you requested:

  1. Code that we are using:
String connectionString = "Endpoint=sb://" + hostname + ";SharedAccessKeyName=" + username + ";SharedAccessKey=" + password;

client = new ServiceBusClientBuilder()
        .connectionString(connectionString)
        .sender()
        .queueName(queueTopic)
        .buildClient();

client.createMessageBatch();

Then wait for 15 minutes. After 15 minutes (exactly), we get the error messages in the attached file. This issue only happens on our production service bus, but for that SB namespace, it happens every time the connection goes idle for 15m or more.

  1. It is a Spring Boot microservice, deployed to Azure Spring Apps
  2. The behavior can be observed with just a single client connection as shown above. The actual app opens 20-30 connections to Service Bus. In that case, each connection exhibits the behavior.
  3. We are connecting to many different topics from the application. The behavior has been observed for many different topics (but only in our prod SB namespace as noted above).
  4. Yes, I can reproduce the behavior running the above code locally on my dev box
  5. Yes - whenever the NPE is observed in the log file, a new reactor-executor thread is created, while the old one contiues to run. In our non-prod SB namespaces, the reactor-executor thread disappears after the session times out (desired behavior). We have observed this both by running a JFR in prod and non-prod, and also by periodically dumping all active threads to the console from inside the app itself.
  6. We are running Service Bus Premium

Again, the thread leak behavior appears to be remediated in version 7.17.4 of Service Bus SDK but I'm documenting this here for others.

Also - our use case is low-latency message transfer, so we open a connection to a database and another conenction to Service Bus and we keep these connections open for long periods of time. Then when any message is published from the database, it can be transferred to a Service Bus topic as quickly as possible (without the delay of opening the connection to Service Bus). There are periods of time when the message volume drops and there are no messages transferred in a 15-minute window, which is when we were seeing the behavior.

@anuchandy
Copy link
Member

@josebarros2025, thank you for reaching out. I think the issue you're facing is because of spring-cloud-azure-starter:5.16.0 having those core versions (2.9.6, 1.49.1) as the transitive dependency, causing conflicts. The next version of spring-cloud-azure-starter bumping the core version is yet to be released.

Have you tried explicitly specifying the required versions of azure-[core|core-amqp] (in addition to azure-messaging-servicebus:7.14.4) in your Spring app POM above the spring-cloud-azure-starter dependency? Typically, in a Console app the first version found in the dependency chain will be used in case of conflict, I'm unsure if Spring dependency resolution behaves in a different way.

If you’re unable to override the versions in Spring app, then unfortunately, you'll need to wait for the azure spring team to release the spring-cloud-azure-starter that uses azure-messaging-servicebus:7.14.4. Generally, the azure spring team schedules releases after all the azure SDKs and BOM for that month are available. Given the previous release timelines, I would expect the releases by azure spring team to happen before mid of Oct.

@anuchandy
Copy link
Member

anuchandy commented Sep 30, 2024

Hello @pje477, that's wonderful news!. Thank you for confirming that 7.14.4 fixes NPE and leak. Also appreciate responding to my questions. It's unfortunate that, like your non-prod namespace, we (SDK Team) were also unable to repro any of this with our test namespaces. Therefore, your assistance in verifying version 7.14.4 was quite valuable.

@padmapriyanalam
Copy link

Hi @anuchandy Thank you for the latest version. I have updated the dependencies to

DependencyTree

and updated code to

 final ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(CONNECTION_STRING)
                .configuration(new ConfigurationBuilder()
                                   .putProperty("com.azure.core.amqp.cache", "true")
                                   .build())
                .sender()
                .queueName(queueName())
                .buildClient();

Deployed the changes to demo environment. Still seeing the exception in demo logs

com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[connectionId], entityName[entityName] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'containerId' because it did not have any active links in the past 300000 milliseconds. TrackingId:TrackingId, SystemTracker:gateway10, Timestamp:2024-10-02T12:26:06', info=null}], errorContext[NAMESPACE: servicebusName. ERROR CONTEXT: N/A, PATH: path]

@josebarros2025
Copy link

josebarros2025 commented Oct 2, 2024

Hi @anuchandy Thank you for the latest version. I have updated the dependencies to

DependencyTree

and updated code to

 final ServiceBusSenderClient senderClient = new ServiceBusClientBuilder()
                .connectionString(CONNECTION_STRING)
                .configuration(new ConfigurationBuilder()
                                   .putProperty("com.azure.core.amqp.cache", "true")
                                   .build())
                .sender()
                .queueName(queueName())
                .buildClient();

Deployed the changes to demo environment. Still seeing the exception in demo logs

com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[connectionId], entityName[entityName] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'containerId' because it did not have any active links in the past 300000 milliseconds. TrackingId:TrackingId, SystemTracker:gateway10, Timestamp:2024-10-02T12:26:06', info=null}], errorContext[NAMESPACE: servicebusName. ERROR CONTEXT: N/A, PATH: path]

Hi we aren't having this issue anymore
I had to force some dependencies otherwise won't work

private ServiceBusSenderAsyncClient sender;

    protected AbstractMessageProducer(final String queueName, final MessagingConfiguration messagingConfiguration, final JacksonJsonSerializer jacksonJsonSerializer) {
        log.debug("producer queueName: {}", queueName);
        this.jacksonJsonSerializer = jacksonJsonSerializer;
        this.messagingConfiguration = messagingConfiguration;
        this.queueName = queueName;
        sender = new ServiceBusClientBuilder()
            .connectionString(messagingConfiguration.getConnectionString())
            .configuration(new ConfigurationBuilder()
                .putProperty("com.azure.core.amqp.cache", "true")
                .build())
            .sender()
            .queueName(queueName)
            .buildAsyncClient();
    }

    public void sendToQueue(final ServiceBusMessage message) {
        sendMessage(message);
    }

    private void sendMessage(final ServiceBusMessage message) {
        try {
            sender.sendMessage(message)
                .subscribe(
                    sentSignal -> log.info("Sent Message with ID {} to queue", message.getMessageId()),
                    errorSignal -> log.warn("Error signal: " + errorSignal),
                    () -> {
                        log.info("Send message completed");
                    });
        } catch (Exception e) {
            log.error("error sending message to queue: {}", e.getMessage(), e);
        }
    }
 <azure.servicebus.version>7.17.4</azure.servicebus.version>

<dependency>
            <groupId>com.azure.spring</groupId>
            <artifactId>spring-cloud-azure-starter</artifactId>
            <version>${azure.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>com.azure</groupId>
                    <artifactId>azure-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.azure</groupId>
                    <artifactId>azure-core-ampq</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core</artifactId>
            <version>1.52.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-core-amqp</artifactId>
            <version>2.9.9</version>
            <scope>compile</scope>
        </dependency>

@anuchandy
Copy link
Member

@josebarros2025, glad to hear you managed to solve the conflicts. Thanks for sharing the solution; I'm sure others will find it helpful.

@anuchandy
Copy link
Member

@padmapriyanalam, thanks for the response. As I noted in my previous comment, we will still observe disconnect events. The service will disconnect if there is no activity, and the client will reconnect during the next send attempt. The log you’re seeing is such a disconnect event.

What 7.14.4 addresses is NullPointerException related to the session disconnect event and resulting thread leaks in certain environments. Hope this clarifies.

@anuchandy
Copy link
Member

Closing this issue as 7.17.4 with the fix is released. Refer the steps outlined here to use 7.17.4

@padmapriyanalam
Copy link

Thank you @josebarros2025 @anuchandy. We aren't having this issue anymore

@robsonkades
Copy link

robsonkades commented Oct 15, 2024

@anuchandy I believe this issue still persists, even after updating the versions, we are still receiving the error

We are using the implementation: spring-cloud-azure-stream-binder-servicebus

	Caused by: java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
	at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)
	at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:39)
	at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$15(ReactorConnection.java:342)
	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1708)
	at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$16(ReactorConnection.java:339)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
            <dependency>
                <groupId>com.azure.spring</groupId>
                <artifactId>spring-cloud-azure-dependencies</artifactId>
                <version>5.17.1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-servicebus</artifactId>
      <version>7.17.4</version> <!-- {x-version-update;com.azure:azure-messaging-servicebus;dependency} -->
    </dependency>

@anuchandy
Copy link
Member

Hello @robsonkades, I think the issue is that the configuration "com.azure.core.amqp.cache" is not enabled. Since the application indirectly uses version 7.17.4 through the Spring library and cannot set it directly in the builder, you can set the system property "com.azure.core.amqp.cache" to true.

@ravibarkhani
Copy link

ravibarkhani commented Nov 15, 2024

Hello @anuchandy
Thanks for explaining the errors here, What I have observed that there are two errors are generated in the application if we are using the old service bus library version.
We are receiving NullPointerException in production since 21st October without doing any deployment of application.

  1. com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[connectionId], entityName[entityName] condition[Error{condition=amqp:connection:forced, description='The connection was closed by container 'containerId' because it did not have any active links in the past 300000 milliseconds. TrackingId:TrackingId, SystemTracker:gateway10, Timestamp:2024-10-02T12:26:06',

Our experience after upgrade to "com.azure:azure-messaging-servicebus:jar:7.17.4" this error is gone.

  1. _reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
    Caused by: java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this.sessions" is null
    at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)

Our observation: This exception is not solved after upgrading to "com.azure:azure-messaging-servicebus:jar:7.17.4" and annoying exception log monitoring system.

We have created shows as below. Request you/team kindly let us know fix for this
return new ServiceBusClientBuilder() .credential(serviceBusNameSpaceName, defaultCredential) .configuration(new ConfigurationBuilder() .putProperty("com.azure.core.amqp.cache", "true") .build()) .sender() .queueName("queue-name") .buildAsyncClient();

First we tried by setting App Setting that also didnt solved NullPointerException

Azure dependencies are :
Image

@anuchandy
Copy link
Member

Hello @ravibarkhani, it is very likely that the environment configuration "com.azure.core.amqp.cache" was not properly reflected in the JVM, resulting in the code path leading to a NullPointerException being triggered.

Could you do the following –

  1. Upgrade to version 7.17.6 (released on 13-Nov).
  2. Delete the use of configuration(..) setter in the ServiceBusClientBuilder.
  3. Build and rerun the application.

Version 7.17.6 has "com.azure.core.amqp.cache" set to "true" by default hence there is no need to set it explicitly in the builder. So please try updating the version and make sure the configuration(..) setter is completely removed.

@dineshlalwani
Copy link

Hi @anuchandy, I'm also facing the similar issue, open one bug #42987 for that.

@ravibarkhani
Copy link

Hello @ravibarkhani, it is very likely that the environment configuration "com.azure.core.amqp.cache" was not properly reflected in the JVM, resulting in the code path leading to a NullPointerException being triggered.

Could you do the following –

  1. Upgrade to version 7.17.6 (released on 13-Nov).
  2. Delete the use of configuration(..) setter in the ServiceBusClientBuilder.
  3. Build and rerun the application.

Version 7.17.6 has "com.azure.core.amqp.cache" set to "true" by default hence there is no need to set it explicitly in the builder. So please try updating the version and make sure the configuration(..) setter is completely removed.

After upgrade to 7.17.6 version of service bus we do not see errors in application. May be useful for others below are the dependency in my application

Image

@izluben
Copy link

izluben commented Dec 3, 2024

Hi @anuchandy , I'm probably just being dumb, but I can't get how configuring via configuration builder works in this case.
If I use the next code

new ServiceBusClientBuilder()
  .connectionString(CONNECTION_STRING)
  .configuration(new ConfigurationBuilder()
         .putProperty("com.azure.core.amqp.cache", "true")
         .build())

then "com.azure.core.amqp.cache", "true" ends up in the explicitConfigurations field of the EnvironmentConfiguration class.
The problem is that SESSION_CHANNEL_CACHE_PROPERTY value is acquired using com.azure.messaging.servicebus.ServiceBusClientBuilder.V2StackSupport#isOptedIn method,
which in turn uses the com.azure.core.implementation.util.EnvironmentConfiguration#getEnvironmentVariable method
effectively ignoring the value in the explicitConfigurations. Meaning SESSION_CHANNEL_CACHE_PROPERTY is set to false after all.

My dependencies:

\--- com.azure.spring:spring-messaging-azure-servicebus:5.18.0
|    |              +--- com.azure.spring:spring-messaging-azure:5.18.0 (*)
|    |              \--- com.azure:azure-messaging-servicebus:7.17.5
|    |                   +--- com.azure:azure-core:1.53.0 (*)
|    |                   +--- com.azure:azure-xml:1.1.0
|    |                   +--- com.azure:azure-core-amqp:2.9.10 (*)
|    |                   \--- com.azure:azure-core-http-netty:1.15.5 (*)

P.S.: setting "com.azure.core.amqp.cache" as an env variable works just ok, and it fixes the thread leak, so thank you, guys, for the fix anyway.

@czoromba1
Copy link

Has it been noted at all that the same reactor-core thread leak issue happens with Azure Event Hubs as well? I wasn't sure if it's being tracked at all since all the discussion has been around Service Bus.

<artifactId>azure-messaging-eventhubs</artifactId>

I'm using the latest azure-messaging-eventhubs version as of now (5.19.1) and even added the cache config property as shown below:

new EventHubClientBuilder()
    .fullyQualifiedNamespace(eventHubNamespaceName)
    .eventHubName(eventHubName)
    .credential(credential)
    .configuration(new ConfigurationBuilder()
        .putProperty("com.azure.core.amqp.cache", "true")
        .build())
    .buildProducerClient();

But I continue to see the same behavior.

Seeing the following error:

java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
Caused By
Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null
at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:91)
at org.apache.qpid.proton.engine.impl.ConnectionImpl.session(ConnectionImpl.java:39)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$15(ReactorConnection.java:342)
at com.azure.core.amqp.implementation.ReactorConnection.lambda$createSession$16(ReactorConnection.java:339)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:294)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:188)
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)

and accumulate as many reactor-executor threads (in runnable state) as there are errors.

Image

My Azure SDK Dependencies

+- com.azure:azure-messaging-eventhubs:jar:5.19.1:compile
[INFO]    |  +- com.azure:azure-core:jar:1.53.0:compile
[INFO]    |  |  +- com.azure:azure-xml:jar:1.1.0:compile
[INFO]    |  |  \- io.projectreactor:reactor-core:jar:3.6.11:compile
[INFO]    |  |     \- org.reactivestreams:reactive-streams:jar:1.0.4:compile
[INFO]    |  \- com.azure:azure-core-amqp:jar:2.9.10:compile
[INFO]    |     +- com.microsoft.azure:qpid-proton-j-extensions:jar:1.2.5:compile
[INFO]    |     \- org.apache.qpid:proton-j:jar:0.34.1:compile
[INFO]    +- com.azure:azure-identity:jar:1.14.0:compile
[INFO]    |  +- com.azure:azure-core-http-netty:jar:1.15.5:compile
[INFO]    |  |  +- io.netty:netty-handler:jar:4.1.114.Final:compile
[INFO]    |  |  |  +- io.netty:netty-resolver:jar:4.1.114.Final:compile
[INFO]    |  |  |  \- io.netty:netty-transport:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-handler-proxy:jar:4.1.114.Final:compile
[INFO]    |  |  |  \- io.netty:netty-codec-socks:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-buffer:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-codec:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-codec-http:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-codec-http2:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-transport-native-unix-common:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-transport-native-epoll:jar:linux-x86_64:4.1.114.Final:compile
[INFO]    |  |  |  \- io.netty:netty-transport-classes-epoll:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-transport-native-kqueue:jar:osx-x86_64:4.1.114.Final:compile
[INFO]    |  |  |  \- io.netty:netty-transport-classes-kqueue:jar:4.1.114.Final:compile
[INFO]    |  |  +- io.netty:netty-tcnative-boringssl-static:jar:2.0.66.Final:compile
[INFO]    |  |  |  +- io.netty:netty-tcnative-classes:jar:2.0.66.Final:compile
[INFO]    |  |  |  +- io.netty:netty-tcnative-boringssl-static:jar:linux-x86_64:2.0.66.Final:compile
[INFO]    |  |  |  +- io.netty:netty-tcnative-boringssl-static:jar:linux-aarch_64:2.0.66.Final:compile
[INFO]    |  |  |  +- io.netty:netty-tcnative-boringssl-static:jar:osx-x86_64:2.0.66.Final:compile
[INFO]    |  |  |  +- io.netty:netty-tcnative-boringssl-static:jar:osx-aarch_64:2.0.66.Final:compile
[INFO]    |  |  |  \- io.netty:netty-tcnative-boringssl-static:jar:windows-x86_64:2.0.66.Final:compile
[INFO]    |  |  +- io.projectreactor.netty:reactor-netty-http:jar:1.1.23:compile
[INFO]    |  |  |  +- io.netty:netty-resolver-dns:jar:4.1.114.Final:compile
[INFO]    |  |  |  |  \- io.netty:netty-codec-dns:jar:4.1.114.Final:compile
[INFO]    |  |  |  +- io.netty:netty-resolver-dns-native-macos:jar:osx-x86_64:4.1.114.Final:compile
[INFO]    |  |  |  |  \- io.netty:netty-resolver-dns-classes-macos:jar:4.1.114.Final:compile
[INFO]    |  |  |  \- io.projectreactor.netty:reactor-netty-core:jar:1.1.23:compile
[INFO]    |  |  \- io.netty:netty-common:jar:4.1.114.Final:compile
[INFO]    |  +- com.azure:azure-json:jar:1.3.0:compile
[INFO]    |  +- com.microsoft.azure:msal4j:jar:1.17.2:compile
[INFO]    |  |  \- com.nimbusds:oauth2-oidc-sdk:jar:11.18:compile
[INFO]    |  |     +- com.nimbusds:content-type:jar:2.3:compile
[INFO]    |  |     \- com.nimbusds:lang-tag:jar:1.7:compile
[INFO]    |  \- com.microsoft.azure:msal4j-persistence-extension:jar:1.3.0:compile

@anuchandy
Copy link
Member

Hello @izluben, interesting, that's seems different from what I'm seeing while debugging, which turn picking up "com.azure.core.amqp.cache" configuration object provided in the builder. Let me dig a bit. I am glad to hear that you are unblocked via environment route.

@anuchandy
Copy link
Member

Hi @czoromba1, thanks for reaching out. For Event Hubs, it's a different configuration. Could you try setting "com.azure.messaging.eventhubs.v2" to "true" instead?

@czoromba1
Copy link

Thank you, @anuchandy! I'm happy to report that setting "com.azure.messaging.eventhubs.v2" to true worked!

My JVM has been running for 37 hours now, and I don't see any signs of memory or thread leaks. On top of that, there are no occurrences of the following NPE:

java.lang.NullPointerException: Cannot invoke "java.util.List.add(Object)" because "this._sessions" is null

Is there a new azure-messaging-eventhubs version planned that doesn't require setting this config?

@anuchandy
Copy link
Member

Hi @czoromba1, glad to hear you're unblocked with Event Hubs v2 stack. To answer your question - we currently plan to make the v2 stack the default for Event Hubs in the first quarter of 2025, eliminating the need for applications to opt-in.

@Haricshore
Copy link

Hi @czoromba1, glad to hear you're unblocked with Event Hubs v2 stack. To answer your question - we currently plan to make the v2 stack the default for Event Hubs in the first quarter of 2025, eliminating the need for applications to opt-in.

Hi @anuchandy ! I'm facing similar issue as @czoromba1, but I'm using spring boot auto configuration for EventHubProducerClient bean. Can you please suggest me a way to overcome thread leaks?

@anuchandy
Copy link
Member

anuchandy commented Jan 2, 2025

Hi @Haricshore, thanks for reaching out. I'm not sure if you're using com.azure:azure-messaging-eventhubs dependency transitively (through one of the Spring packages) or directly, but can you try the following? -

  1. If you're using transitively, then exclude the com.azure:azure-messaging-eventhubs.
  2. Add com.azure:azure-messaging-eventhubs: 5.19.2 version explicitly in your application pom file.
  3. Set the environment variable com.azure.messaging.eventhubs.v2 to true for JVM to pick it.

This should run the azure-messaging-eventhubs clients in v2 stack mode.

To confirm v2 stack is loaded, enable DEBUG log, and confirm we're seeing logs from the two v2 stack classes - RequestResponseChannelCache and ReactorConnectionCache.

If the v2 stack is not enabled for some reason (e.g., azure-messaging-eventhubs version upgrade didn't resolve in your app environment, or the environment variable com.azure.messaging.eventhubs.v2 is not picked by the JVM) then we will see logs from deprecated v1 stack type AmqpChannelProcessor, which has thread leak issue.

@saraswathi-d15
Copy link

Hi @anuchandy

application is failed to send messages to the topic and could see following error in our logs.

Error:
{"az.sdk.message":"onSessionRemoteClose","connectionId":"MF_f70a97_1735305557151","errorCondition":"amqp:connection:forced","errorDescription":"The connection was closed by container '10f3e5b567d847f4889894f0e3321971_G4' because it did not have any active links in the past 300000 milliseconds. TrackingId:10f3e5b567d847f4889894f0e3321971_G4, SystemTracker:gateway5, Timestamp:2024-12-27T13:45:17"

java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)\n\tat java.base/java.lang.Thread.run(Unknown Source)\nCaused by: com.azure.core.amqp.implementation.RequestResponseChannelClosedException: Cannot send a message when request response channel is disposed.\n\tat com.azure.core.amqp.implementation.RequestResponseChannel.sendWithAck(RequestResponseChannel.java:309)\n\tat com.azure.core.amqp.implementation.RequestResponseChannel.sendWithAck(RequestResponseChannel.java:296)\n\tat com.azure.core.amqp.implementation.ClaimsBasedSecurityChannel.lambda$authorize$2(ClaimsBasedSecurityChannel.java:67)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)\n\t... 42 common frames omitted\n"}

{"az.sdk.message":"Error in SendLinkHandler. Disposing unconfirmed sends.","exception":"The connection was closed by container 'a8f0898d6b704c94adeb7780b54bb924_G13' because it did not have any active links in the past 300000 milliseconds. TrackingId:a8f0898d6b704c94adeb7780b54bb924_G13, SystemTracker:gateway5, Timestamp:2024-12-27T13:45:19, errorContext[NAMESPACE:----------
. ERROR CONTEXT: N/A, PATH: $cbs, REFERENCE_ID: cbs:sender, LINK_CREDIT: 99]","connectionId":"MF_c62215_1735305559278","linkName":"cbs"}

Failed to create send link ---topic name----
Cannot send a message when request response channel is disposed.

Using following the spring cloud stream to send messages via asb

	<dependency>
		<groupId>com.azure.spring</groupId>
		<artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
		<version>5.16.0</version>
	</dependency>

	<dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-stream</artifactId>
	</dependency>

@anuchandy
Copy link
Member

Hello, could you collect -15/+5 minutes of SDK DEBUG logs around the above error log, instructions to collect the logs can be found here (note: "AMQP transport logs" are NOT required but only Azure SDK logs)

@anuchandy
Copy link
Member

Hello @saraswathi-d15, I got access to the logs your shared. During log analysis I see the following log entry -

{"Timestamp": "2025-01-20T04:40:37.855516049Z", "logger_name": "com.azure.core.amqp.implementation.AmqpChannelProcessor", "thread_name": "parallel-1", "level": "ERROR", "stack_trace": "java.lang.NullPointerException: Cannot invoke \"java.util.List.add(Object)\" because \"this._sessions\" is null"}
{
    "az.sdk.message": "Retry attempts exhausted or exception was not retriable.",
    "exception": "Cannot invoke \"java.util.List.add(Object)\" because \"this._sessions\" is null",
    "connectionId": "MF_32f5bc_1737346566050",
    "entityPath": "$cbs",
    "tryCount": 1
}

The NullPointerException originating from the AmqpChannelProcessor type is addressed in later versions of azure-messaging-servicebus (e.g. 7.17.7). In older versions of azure-messaging-servicebus, if the broker closed the connection from the client for any reason, it resulted in this NullPointerException (NPE). It is common for the broker to close the connection with an error such as "connection was closed by container '02d9ebd5778b4a048f030550ca24399c_G4' because it did not have any active links in the past 300000 milliseconds." The library is designed to retry these transient errors, but this NPE can stop library from retrying.

Could you upgrade the version of spring-cloud-azure-stream-binder-servicebus to 5.19.0 which uses azure-messaging-servicebus version 7.17.7?

I don't see any log entries about "Cannot send a message when request response channel is disposed" but let's upgrade as mentioned above and see if that resolves the problem.

@jbauerrfid
Copy link

I discovered the problem myself in this dependency:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.13.0</version>
</dependency>

Upgrading to this one fixed it in my service:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.17.8</version>
</dependency>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests