-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using channel after CancelledError exception in basic_consume callback can cause connection to close #70
Comments
I'm getting this exact issue also. It would be good to have some clarity over this functionality. |
I had a look at the commit history to try and understand why that is the case. It looks as though one day he fixed it by cancelling tasks before closing the channel: 4ee42e2#diff-03ab1aa1a76f0fbf4c092ce774a54d08 However the next commit on the same day looks to revert that change as part of another change: ed55461#diff-03ab1aa1a76f0fbf4c092ce774a54d08 Based on the name of the commit message, I'd suspect he accidentally committed his old files and overrode his initial change. Is this correct? |
Let's see to the AMQP is totally asynchronous, and the receiving But I might be misunderstood your problem 🤔. If you have a solution (or any proposal) for this problem, don't keep silence. |
The issue for me(and the reason for writing) is that this happens:
because we tried to send something after channel has been closed. async def callback(message: IncomingMessage):
async with message.process():
await asyncio.sleep(0.1) and then we issue await queue.cancel(tag) and await channel.close() on one channel. Consider this example: import asyncio
import logging
import sys
from typing import Optional
import aio_pika
from aio_pika import IncomingMessage
logging.basicConfig(
format='%(relativeCreated)8.2f - %(name)20s - %(levelname)8s - %(message)s',
level=logging.DEBUG,
stream=sys.stdout
)
N = 10
RABBIT_URL = 'amqp://guest:guest@localhost'
QUEUE_NAMES = ['test_queue{}'.format(i) for i in range(N)]
def callback_wrapper(queue_name: str):
async def callback(message: IncomingMessage):
try:
async with message.process():
await asyncio.sleep(0.1)
finally:
print('--- Callback finished queue:', queue_name)
return callback
def close_callback(reason):
logging.warning('CONNECTION CLOSED %s', str(reason))
async def consumer(connection: aio_pika.Connection, i: int, queue_name: str):
channel: Optional[aio_pika.channel.Channel] = None
tag: Optional[str] = None
queue: Optional[aio_pika.Queue] = None
try:
channel = await connection.channel()
queue = await channel.declare_queue(queue_name)
await channel.set_qos(prefetch_count=3)
tag = await queue.consume(callback_wrapper(queue_name))
await asyncio.sleep(3600 if i > 0 else 2)
except:
logging.exception('Consumer exception on queue: %s', queue_name)
finally:
if queue and tag:
logging.info('Before canceling %s', queue_name)
await queue.cancel(tag)
logging.info('Queue %s consume canceled', queue_name)
if channel:
await channel.close()
logging.info('Channel closed (Queue: %s)', queue_name)
async def main():
connection: Optional[aio_pika.Connection] = None
try:
connection = await aio_pika.connect('amqp://guest:guest@localhost')
connection.add_close_callback(close_callback)
tasks = [
asyncio.create_task(consumer(connection, i, queue_name))
for i, queue_name in enumerate(QUEUE_NAMES)
]
await asyncio.gather(*tasks, return_exceptions=True)
finally:
await asyncio.sleep(2)
logging.info('After 2 seconds: Connection.is_closed == %s ', connection.is_closed)
if connection:
await connection.close()
logging.info('Connection closed')
if __name__ == '__main__':
asyncio.run(main()) Queue filler code for testing: import asyncio
from typing import Awaitable, Callable
import aio_pika
N = 10
RABBIT_URL = 'amqp://guest:guest@localhost'
QUEUE_NAMES = ['test_queue{}'.format(i) for i in range(N)]
async def purge(connection: aio_pika.Connection, queue_name: str):
channel: aio_pika.Channel = await connection.channel(publisher_confirms=False)
queue: aio_pika.Queue = await channel.declare_queue(queue_name)
await queue.purge()
async def push(connection: aio_pika.Connection, queue_name: str):
channel: aio_pika.Channel = await connection.channel(publisher_confirms=False)
await channel.declare_queue(queue_name)
for i in range(100000):
await channel.default_exchange.publish(aio_pika.Message(
body=f'Q:{queue_name} Message: {i}'.encode()),
routing_key=queue_name)
async def main(fill=True):
connection: aio_pika.Connection = await aio_pika.connect(RABBIT_URL)
action: Callable[[aio_pika.Connection, str], Awaitable] = push if fill else purge
tasks = [asyncio.create_task(action(connection, queue_name)) for queue_name in QUEUE_NAMES]
await asyncio.gather(*tasks, return_exceptions=True)
await connection.close()
if __name__ == '__main__':
asyncio.run(main(fill=True)) Output that I get:
Program is still running because of await asyncio.sleep(3600 if i > 0 else 2) but other channels do not receive anything because connection is already closed:
Than I tried to reverse with suppress(Exception):
await self._on_close(exc)
with suppress(Exception):
await self._cancel_tasks(exc) in aiormq.base.__closer
Than I send SIGINT, some KeyboardInterrupt, Cancellation Exceptions were thrown and program hanged, but that is not related to this issues and is related to mosquito/aio-pika#253 So my reasoning is that trying to write to closed channel should not effect connection or other channels in this drastic manner. Maybe reversing solves this issue completely or one can prevent (in the library code) sending anything after channel.close has been issued. |
Python 3.8.1
aio-pika 6.4.1
aiormq 3.2.0
Consider callback example (using aio_pika):
Or plain aiormq:
and this section:
aiormq/aiormq/base.py
Lines 139 to 143 in 63a8b0d
Those two examples give me:
in Connection.add_close_callback (Connection.closing.add_done_callback for aiormq) after await channel.close()
I understand that second example is far-fetched. It's not exactly necessary to send nack/reject in case of CancelledError. But I ran into something similar to first example and had been debugging for a long time what causes sending nacks after channel has been closed. I thought that I simply can't send anything after close, so I needed to wrap code in callback in some logic to prevent sending messages after close. And than I found that channel.close waits for all subtasks(including on_message callbacks?) but for some reason after it have already sent Channel.Close method via AMQP
So my question is following: Is closing channel first and than cancelling subtasks intentional?
One full example:
Output:
P.S. I added custom prints to __closer in aiormq:
P.S Connection.is_closed equals False in log 2 seconds after closing callback fired
The text was updated successfully, but these errors were encountered: