Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Accepting more than 256 sessions on single instance of ServiceBusSessionReceiverAsyncClient breaks connectivity but does not throw an error #32211

Closed
3 tasks done
marxxxx opened this issue Nov 17, 2022 · 9 comments
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus

Comments

@marxxxx
Copy link

marxxxx commented Nov 17, 2022

Describe the bug
When using a single instance of ServiceBusSessionReceiverAsyncClient and calling acceptSession for >= 257 sessions, all previously registered sessions with the same ServiceBusSessionReceiverAsyncClient will stop receiving any messages from any of the sessions.
We are able to work around the issue by using multiple instances of ServiceBusSessionReceiverAsyncClient but the behavior of the SDK is not very helpful.
In comparison, the .NET SDK's limit of what a single Instance of ServiceBusClient can handle is much higher (5000 Subscriptions) and it throws an exception when the limit is exceeded.

Exception or Stack Trace
No exception is thrown, it's possible to call acceptSession even for > 10000 sessions without any error.

To Reproduce
See below Code-Snipped. It is based on https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/servicebus/azure-messaging-servicebus/src/samples/java/com/azure/messaging/servicebus/ReceiveNamedSessionAsyncSample.java
Provide your connection string and the Name of your Queue in the respective constants.
The queue must have sessions enabled.

None of the subscriptions will receive any messages from any of the sessions.
Changing SessionCount to 256 establishes connectivity for all of the sessions.

Code Snippet

package com.azure.messaging.servicebus;

import com.azure.messaging.servicebus.models.ServiceBusReceiveMode;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Scanner;

public class ReceiveNamedSessionAsyncSample {
    static String connectionString = "<PROVIDE__CONNECTION_STRING>";
    static String queueName = "<PROVIDE_QUEUE_NAME_OF_SESSION_ENABLED_QUEUE>";

    // 256 works, 257 will break the receiver
    final static Integer SessionCount = 257;

   public static void main(String[] args) throws InterruptedException {
        ReceiveNamedSessionAsyncSample sample = new ReceiveNamedSessionAsyncSample();

        // Create a receiver.
        ServiceBusSessionReceiverAsyncClient sessionReceiver = new ServiceBusClientBuilder()
            .connectionString(connectionString)
            .sessionReceiver()
            .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
            .queueName(queueName)
            .buildAsyncClient();

        ArrayList<Disposable> subscriptions = new ArrayList<Disposable>();

        for(Integer i=0; i<SessionCount; i++) {
            String sessionId = i.toString();
            System.out.println("Receiving from Session " + sessionId);

            Disposable sub = sample.run(sessionReceiver, sessionId);
            subscriptions.add(sub);
        }

        System.out.println("All session receivers active. Press any key to exit.");

        Scanner in = new Scanner(System.in);

        String s = in.nextLine();

        System.out.println("Disposing subscriptions ...");
        for(var sub: subscriptions) {
            sub.dispose();
        }

        System.out.println("Closing receiver ...");
        sessionReceiver.close();

        System.out.println("Bye.");
    }

    @Test
    public Disposable run(ServiceBusSessionReceiverAsyncClient sessionReceiver, String sessionId) throws InterruptedException {

        try {
            Mono<ServiceBusReceiverAsyncClient> receiverMono = sessionReceiver.acceptSession(sessionId);

            System.out.println("-- Session " + sessionId + " accepted.");

            Disposable subscription = Flux.usingWhen(receiverMono,
                    receiver -> receiver.receiveMessages(),
                    receiver -> Mono.fromRunnable(() -> receiver.close()))
                .subscribe(message -> {
                    System.out.printf("Session: %s. Sequence #: %s. Contents: %s%n", message.getSessionId(),
                        message.getSequenceNumber(), message.getBody());

                }, error -> {
                    System.err.println("Error occurred: " + error);
                });

            return subscription;
        } catch(Exception ex) {
            System.err.println("Error accepting session " + sessionId + ": " + ex );
            return null;
        }
    }
}

Expected behavior
When there is a limit of 256 subscriptions i would expect an exception to be thrown.
The .NET SDK allows for 5000 subscriptions via a single ServiceBusClient. I would expect both of the SDKs to have the same limits.

Screenshots

Setup (please complete the following information):

  • OS: Windows
  • IDE: IntelliJ
  • Library/Libraries: [azure-messaging-servicebus:7.13.0-beta.1]
  • Java version: [OpenJdk-18]
  • App Server/Environment: -
  • Frameworks: -

Additional context

Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report

  • Bug Description Added
  • Repro Steps Added
  • Setup information Added
@ghost ghost added needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Nov 17, 2022
@marxxxx marxxxx changed the title [BUG] Accepting more than 256 sessions on ServiceBusSessionReceiverAsyncClient breaks connectivity without error [BUG] Accepting more than 256 sessions on single instance of ServiceBusSessionReceiverAsyncClient breaks connectivity but does not throw an error Nov 17, 2022
@joshfree joshfree added Service Bus Client This issue points to a problem in the data-plane of the library. labels Dec 5, 2022
@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Dec 5, 2022
@joshfree
Copy link
Member

joshfree commented Dec 5, 2022

@liukun-msft could you please take a look?

@liukun-msft
Copy link
Contributor

@marxxxx Thank you for contacting us! I'll check it and get back to you soon.

@liukun-msft
Copy link
Contributor

liukun-msft commented Dec 6, 2022

I can succesfully repro the issue and see lots of The session lock has expired on the MessageSession errors. I suspect we may limit the buffer that holds the session locks. We use reactor for async receive and use onBackpressure() for sink to emit message to downstream. Because default buffer size for backpressure is 256, it may have some relevance to this issue.

I may take more time to find the root cause. I'll keep you updates for any progress.

@liukun-msft
Copy link
Contributor

liukun-msft commented Dec 7, 2022

Hi @marxxxx

By digging into the logs, I found that the reason is that the service side has session number limitation. When a client accepts more than 255 sessions, it will receive an amqp:resource-limit-exceeded error from the service side, and causes the client to close all sessions and stop receiving messages.

[reactor-executor-1] INFO  com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose","connectionId":"MF_5a5d03_1670384269853","errorCondition":"amqp:resource-limit-exceeded","errorDescription":"Cannot allocate more handles to the current session or connection. The maximum number of handles allowed is 255. Please free up resources and try again.","sessionName":"sessionqueue"}
[reactor-executor-1] INFO  com.azure.core.amqp.implementation.handler.SessionHandler - {"az.sdk.message":"onSessionRemoteClose closing a local session.","connectionId":"MF_5a5d03_1670384269853","errorCondition":"amqp:resource-limit-exceeded","errorDescription":"Cannot allocate more handles to the current session or connection. The maximum number of handles allowed is 255. Please free up resources and try again.","sessionName":"sessionqueue"}
…
[reactor-executor-1] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_4a8002_1670395912049], entityName[sessionqueue] condition[Error{condition=amqp:resource-limit-exceeded, description='Cannot allocate more handles to the current session or connection. The maximum number of handles allowed is 255. Please free up resources and try again.', info=null}], errorContext[NAMESPACE: servicebusliuku.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: sessionqueue]
Caused by: com.azure.core.amqp.exception.AmqpException: onSessionRemoteClose connectionId[MF_4a8002_1670395912049], entityName[sessionqueue] condition[Error{condition=amqp:resource-limit-exceeded, description='Cannot allocate more handles to the current session or connection. The maximum number of handles allowed is 255. Please free up resources and try again.', info=null}], errorContext[NAMESPACE: servicebusliuku.servicebus.windows.net. ERROR CONTEXT: N/A, PATH: sessionqueue]
	at com.azure.core.amqp.implementation.ExceptionUtil.toException(ExceptionUtil.java:85)
	at com.azure.core.amqp.implementation.handler.SessionHandler.onSessionRemoteClose(SessionHandler.java:136)
	at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:152)
	at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
	at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
	at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
	at com.azure.core.amqp.implementation.ReactorExecutor.run(ReactorExecutor.java:91)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

.NET should also be limited to 255 sessions, can you provide the corresponding .NET code snippet? I can double check that.

We are able to work around the issue by using multiple instances of ServiceBusSessionReceiverAsyncClient but the behavior of the SDK is not very helpful.

Due to the limitation of the service side, if you want to receive more sessions, you need to create a new client, like below code:

ArrayList<ServiceBusSessionReceiverAsyncClient> receivers = new ArrayList<>();
        ServiceBusSessionReceiverAsyncClient sessionReceiver = null;
        for (int i = 0; i < SessionCount; i++) {
            if (i % 255 == 0) {
                sessionReceiver = new ServiceBusClientBuilder()
                        .connectionString(Credentials.serviceBusConnectionString)
                        .sessionReceiver()
                        .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                        .queueName(sessionQueue)
                        .buildAsyncClient();
                receivers.add(sessionReceiver);
            }

            String sessionId = String.valueOf(i);
            System.out.println("Receiving from Session " + sessionId);

            Disposable sub = sample.run(sessionReceiver, sessionId);
            subscriptions.add(sub);
        }

I am not sure if you are using the same ServiceBusClientBuilder to create new client, which uderlying use the shared connection. Because the limitation is 255 sessions for one connection, if we use a shared connection, the error still exists.

You can tell us more about why this work around is not helpful for your scenarios.

Enhancement

For SDK team, since the our logs contains a lot of close procedures that hide the real errors, we will going to tune the log afterwards, which is tracked by this issue: #20836. We will also review the error handling logic when this particular error occurs.
/cc @Azure/azsdk-sb-java

@marxxxx
Copy link
Author

marxxxx commented Jan 3, 2023

.NET should also be limited to 255 sessions, can you provide the corresponding .NET code snippet? I can double check that.

Sorry for the delay. I just verified that the current .NET Core SDK does not show this behavior.

You can use this sample code to replicate it:


using Azure.Messaging.ServiceBus;

const string ConnectionString = "<PROVIDE_YOUR_CONNECTION_STRING>";
const string QueueName = "<PROVIDE_YOUR_QUEUE_NAME>";

const int MaxSessions = 300; // above 255, works!

var client = new ServiceBusClient(ConnectionString);

for (int i = 0; i < MaxSessions; i++)
{
    try
    {
        var sessionId = i.ToString();

        _ = Task.Run(async () => await ReceiveFromSessionAsync(sessionId, client));
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
}

Console.WriteLine("Established connections. Press any key to exit.");
Console.ReadLine();

client.DisposeAsync();

Console.WriteLine("Bye.");

static async Task ReceiveFromSessionAsync(string sessionId, ServiceBusClient client)
{
    try
    {
        var sessionReceiver = await client.AcceptSessionAsync(QueueName, sessionId, new ServiceBusSessionReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete });
        Console.WriteLine($"Session {sessionId} accepted.");

        await foreach (var item in sessionReceiver.ReceiveMessagesAsync())
        {
            var message = item.Body.ToString();

            Console.WriteLine($"Received message from session {sessionId}: {message}");
        }
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Failed to receive messages from session {sessionId}: {ex}");
    }
}

@marxxxx
Copy link
Author

marxxxx commented Jan 3, 2023

You can tell us more about why this work around is not helpful for your scenarios.

We can currently live with this workaround but i'm a bit doubtful about the behavior, the SDK shows. When we don't receive any exceptions in our code when this limit is hit and all the existing subscriptions just stop working from this point, the root causes are very hard to track and put us at risk for service disruptions.

Also when we hard-code this limit in our application and you decide to change it at a later stage, chances are high that this adjustment will not be done in our application and either we face errors hard to track (if you decide to lower the limit) or make sub-optimal use of resources (if you raise the limit and a connection could hold more than 255 connections but we still have this limitation applied).

@liukun-msft
Copy link
Contributor

Thanks! I will compare the differences with the .NET code and see if we can increase the limit.

@liukun-msft
Copy link
Contributor

Hi @marxxxx,

The behavior is different because the implementation of acceptSession() is different in Java and .NET

In Java SDK, it uses a shared AQMP session to communicate with the same entity (i.e. queue or topic). When we call acceptSession(), it creates different AMQP links to receive messages, but all the links are attached to the same AMQP Session. In other words, messages with different session id are delivered on different AMQP links, but these AMQP links are attached to the same AMQP Session. Because AMQP Sessions can have maximum 255 AMQP links, we will receive the error amqp:resource-limit-exceeded when we accept more than 255 sessions.

In NET SDK, when we call acceptSession(), it will create both a new AMQP session and a new AMQP link. Thus, messages with different session id will be delivered on different AMQP sessions and links. In this case, the limitation is 5000, because one AMQP connection can have maximum 5000 AMQP links.

When we call acceptSession(), it makes sense to create a new AMQP session for receiving message with same session id. I will discuss this in our team, but based on the current implementation and code structure, it may need some time to make changes.

Copy link
Contributor

Hi @marxxxx, we deeply appreciate your input into this project. Regrettably, this issue has remained unresolved for over 2 years and inactive for 30 days, leading us to the decision to close it. We've implemented this policy to maintain the relevance of our issue queue and facilitate easier navigation for new contributors. If you still believe this topic requires attention, please feel free to create a new issue, referencing this one. Thank you for your understanding and ongoing support.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Nov 18, 2024
@github-actions github-actions bot locked and limited conversation to collaborators Nov 18, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Bus
Projects
None yet
Development

No branches or pull requests

3 participants