Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autorecovery for server-named queues loops indefinitely when consumer listen this queue #1238

Closed
stukselbax opened this issue Aug 16, 2022 · 5 comments · Fixed by #1325
Closed
Assignees
Milestone

Comments

@stukselbax
Copy link

stukselbax commented Aug 16, 2022

When using autorecovery connection which listens the server-named autodelete queue, and the process of connection autorecovery starts, it will never ends.

RabbitMQ.Client version: 6.4.0

Here minimalistic console application where problem can be reproduced

CODE

namespace RabbitMQTest
{
    using System.Globalization;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Logging;

    internal class Program
    {
        static void Main()
        {
            var consoleLogger = new RabbitMqConsoleEventListener();
            var resolver = new DefaultEndpointResolver(new[] { new AmqpTcpEndpoint("localhost") });
            var connectionFactory = new ConnectionFactory
            {
                UserName = "guest",
                Password = "guest",
                Port = 5672,
                DispatchConsumersAsync = true,
                AutomaticRecoveryEnabled = true,
                TopologyRecoveryEnabled = true,
                HostName = "localhost",
                EndpointResolverFactory = endpoints =>
                {
                    return resolver;
                },
                UseBackgroundThreadsForIO = true,
            };
            var connection = connectionFactory.CreateConnection();
            var channel = connection.CreateModel();

            // use "" for server named queue
            var queueDeclareOk = channel.QueueDeclare("", durable: false, exclusive: true, autoDelete: true, null);

            Console.WriteLine(
                    "Declared queue {0}",
                    queueDeclareOk.QueueName);

            channel.BasicQos(0, 50, false);

            // use "" for server named queue
            var consumerTag = channel.BasicConsume(
                "",
                false,
                Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture),
                true,
                true,
                null,
                new AsyncDefaultBasicConsumer(channel));

            while (true)
            {
                Thread.Sleep(2000);
                Console.WriteLine("Waiting for a message");
            }
        }
    }
}

when it runs, make network between this app and rabbitmq server disconnected. For example, when running rabbitmq using docker, you can run the following commands:

docker network ls
# choose the network your rabbitmq container uses
docker network disconnect NETWORK_NAME RABBITMQ_CONTAINER
# wait for autorecovery process in console application starts
docker network connect NETWORK_NAME RABBITMQ_CONTAINER

After this steps it is expected that connection, with consumers and topology, will be recovered successfully.

Actually, recovery process didn't complete, because of consumer recovery error:

Informational: Performing automatic recovery
Error: Topology recovery exception
Error: Exception: RabbitMQ.Client.Exceptions.TopologyRecoveryException
Caught an exception while recovering consumer aa92187d6e204b399a45f2638132a4cf on queue : The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20
Stack trace

Informational: Performing automatic recovery
Error: Topology recovery exception
Error: Exception: RabbitMQ.Client.Exceptions.TopologyRecoveryException
Caught an exception while recovering consumer aa92187d6e204b399a45f2638132a4cf on queue : The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20


InnerException:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at RabbitMQ.Client.Impl.RecordedConsumer.Recover(IModel channelToUse)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers(AutorecoveringModel modelToRecover, IModel channelToUse)
Error: Exception when recovering connection. Will try again after retry interval.
Error: Exception: RabbitMQ.Client.Exceptions.TopologyRecoveryException
Caught an exception while recovering consumer aa92187d6e204b399a45f2638132a4cf on queue : The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20

   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.HandleTopologyRecoveryException(TopologyRecoveryException e)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers(AutorecoveringModel modelToRecover, IModel channelToUse)
   at RabbitMQ.Client.Impl.AutorecoveringModel.AutomaticallyRecover(AutorecoveringConnection conn, Boolean recoverConsumers)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverModelsAndItsConsumers()
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.TryPerformAutomaticRecovery()
InnerException:
RabbitMQ.Client.Exceptions.OperationInterruptedException: The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=404, text='NOT_FOUND - no previously declared queue', classId=60, methodId=20
   at RabbitMQ.Client.Impl.SimpleBlockingRpcContinuation.GetReply(TimeSpan timeout)
   at RabbitMQ.Client.Impl.ModelBase.BasicConsume(String queue, Boolean autoAck, String consumerTag, Boolean noLocal, Boolean exclusive, IDictionary`2 arguments, IBasicConsumer consumer)
   at RabbitMQ.Client.Impl.RecordedConsumer.Recover(IModel channelToUse)
   at RabbitMQ.Client.Framing.Impl.AutorecoveringConnection.RecoverConsumers(AutorecoveringModel modelToRecover, IModel channelToUse)
Informational: Received request to BeginAutomaticRecovery, but already in Recovering state.

The possible root cause is that RecordedConsumer stores "" queue name when trying to call BasicConsume. When queues are recovering, an attempt is made to change consumers queue names. But RecordedQueue stores server-generated name. As the result, the name of queue in RecordedConsumer didn't changed - it still stores "", which leads to 404 error, because it didn't exists.

@stukselbax stukselbax changed the title Autorecovery for server-named queues loops indefinitely Autorecovery for server-named queues loops indefinitely when consumers listen this queue Aug 16, 2022
@stukselbax stukselbax changed the title Autorecovery for server-named queues loops indefinitely when consumers listen this queue Autorecovery for server-named queues loops indefinitely when consumer listen this queue Aug 16, 2022
@michaelklishin
Copy link
Member

You have missed one crucial piece of information: what version of the client is used.

@michaelklishin
Copy link
Member

If you understand what the issue is, you are welcome to submit a PR.

@lukebakken lukebakken self-assigned this Sep 13, 2022
@lukebakken lukebakken added this to the 6.4.1 milestone Sep 13, 2022
@lukebakken lukebakken modified the milestones: 6.4.1, 6.5.0 Feb 21, 2023
@lukebakken
Copy link
Contributor

I'm going to see if there's a reasonably simple fix. The obvious work-around is to change your code to NOT use the empty string when consuming:

var consumerTag = channel.BasicConsume(
    queueDeclareOk.QueueName,
    false,
    Guid.NewGuid().ToString("N", CultureInfo.InvariantCulture),
    true,
    true,
    null,
    new AsyncDefaultBasicConsumer(channel));

lukebakken added a commit that referenced this issue Mar 23, 2023
Fixes #1238

* Add failing test
* Fix `RecordedConsumer` to allow the empty string for a queue name
lukebakken added a commit that referenced this issue Mar 23, 2023
Fixes #1238

* Add failing test
* Fix `RecordedConsumer` to allow the empty string for a queue name

* Add `CurrentQueue` to `IModel` to keep track of the last declared queue name as defined in the AMQP 091 spec

* Fix `RecordedConsumer` to use `CurrentQueue` when passed in name is `string.Empty`

Fixup API Approval
lukebakken added a commit that referenced this issue Mar 23, 2023
Port of #1324 to main

Fixes #1238

* Add failing test
* Fix `RecordedConsumer` to allow the empty string for a queue name
* Add `CurrentQueue` to `IChannel` to keep track of the last declared queue name as defined in the AMQP 091 spec
* Fix `RecordedConsumer` to use `CurrentQueue` when passed in name is `string.Empty`
* Fixup API Approval
@lukebakken
Copy link
Contributor

Fixed by #1324

@lukebakken
Copy link
Contributor

@stukselbax the fix for this issue will ship in version 6.5.0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants