Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the ClientWebSocketResponse.receive stuck for a long time if network is suddenly unavaiable. #5860

Closed
wants to merge 1 commit into from

Conversation

Hanaasagi
Copy link
Member

@Hanaasagi Hanaasagi commented Jul 6, 2021

What do these changes do?

This PR fix the websocket client will get stuck when a the internect is unavaiable.

How to reproduce it

Run following code, and turn off the internet after seeing the Connected text in the console.

from asyncio import run
from aiohttp import ClientSession

async def main():
    async with ClientSession() as session:
        async with session.ws_connect('https://echo.websocket.org/',
                                      heartbeat=10) as ws:
            print('Connected', ws)
            print('Received', await ws.receive())

run(main())

In my local machine, it will block ablout 8 min and get a CLOSED message.

Explain Why

receive method has a timeout argument, and the default value is None. It means no timeout for reading from a connection. The websocket pong frame and user data frame uses the same timeout. So if aiohttp send a ping frame successfully, then we turn off the network. The receive coroutine will be block at the msg = await self._reader.read(). Because there are no data comes from the server.

async def receive(self, timeout: Optional[float] = None) -> WSMessage:
while True:
if self._waiting is not None:
raise RuntimeError("Concurrent call to receive() is not allowed")
if self._closed:
return WS_CLOSED_MESSAGE
elif self._closing:
await self.close()
return WS_CLOSED_MESSAGE
try:
self._waiting = self._loop.create_future()
try:
with async_timeout.timeout(
timeout or self._receive_timeout, loop=self._loop
):
msg = await self._reader.read()
self._reset_heartbeat()
finally:
waiter = self._waiting
self._waiting = None
set_result(waiter, True)
except (asyncio.CancelledError, asyncio.TimeoutError):
self._close_code = 1006
raise

Even if we call _pong_not_received, it doesn't cancel the receive coroutine.

def _pong_not_received(self) -> None:
if not self._closed:
self._closed = True
self._close_code = 1006
self._exception = asyncio.TimeoutError()
self._response.close()

The receive coroutine finished until a EofStream error from the TCP Stack.

What did this PR do

Add a simple flag(asyncio.Event) _is_heartbeat_failed in ClientWebSocketResponse. And when call ClientWebSocketResponse.receive, it both await self._reader.read() and this flag by asyntio.wait . So if the heartbeat is failed, the ClientWebSocketResponse.receive will be awake.

Are there changes in behavior for the user?

Related issue number

Fix #2309

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • If you provide code modification, please add yourself to CONTRIBUTORS.txt
    • The format is <Name> <Surname>.
    • Please keep alphabetical order, the file is sorted by names.
  • Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> for example (588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the pr
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: "Fix issue with non-ascii contents in doctest text files."

@webknjaz
Copy link
Member

webknjaz commented Jul 6, 2021

Please fill out the PR template with the details on what this is supposed to do.

@Hanaasagi Hanaasagi changed the title Fix: fix the receive stuck for long time even if heartbeat is failed. [WIP]Fix: fix the receive stuck for long time even if heartbeat is failed. Jul 7, 2021
@codecov
Copy link

codecov bot commented Jul 7, 2021

Codecov Report

Attention: Patch coverage is 92.30769% with 1 line in your changes missing coverage. Please review.

Project coverage is 96.80%. Comparing base (7080a8b) to head (2c390a0).
Report is 3978 commits behind head on master.

Files with missing lines Patch % Lines
aiohttp/client_ws.py 92.30% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5860      +/-   ##
==========================================
+ Coverage   96.75%   96.80%   +0.05%     
==========================================
  Files          44       44              
  Lines        9851     9863      +12     
  Branches     1591     1594       +3     
==========================================
+ Hits         9531     9548      +17     
+ Misses        182      178       -4     
+ Partials      138      137       -1     
Flag Coverage Δ
unit 96.70% <92.30%> (+0.05%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

…if network is suddenly interrupted.

Related to aio-libs#2309; If we call `receive()` with a unlimited timeout,
it will get stuck for long time, because `_pong_not_received` has no way
to awake a block coroutine `receive()`. This PR add a `asyncio.Event` to
awake this coroutine.
@psf-chronographer psf-chronographer bot added the bot:chronographer:provided There is a change note present in this PR label Jul 8, 2021
@Hanaasagi Hanaasagi changed the title [WIP]Fix: fix the receive stuck for long time even if heartbeat is failed. Fix: fix the receive stuck for long time even if heartbeat is failed. Jul 8, 2021
@Hanaasagi Hanaasagi changed the title Fix: fix the receive stuck for long time even if heartbeat is failed. Fix the ClientWebSocketResponse.receive stuck for a long time if network is suddenly unavaiable. Jul 8, 2021
@Hanaasagi Hanaasagi marked this pull request as ready for review July 8, 2021 08:54
@Hanaasagi Hanaasagi requested a review from asvetlov as a code owner July 8, 2021 08:54
elif is_heartbeat_failed_task in done:
read_task.cancel()
assert self._exception is not None
raise self._exception
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that raise TimeoutError or just return a WSMessage(WSMsgType.ERROR, ..) message which is better.

@gil-cohen
Copy link

any update?

)
is_heartbeat_failed_task: asyncio.Task[bool] = asyncio.create_task(
self._is_heartbeat_failed.wait()
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need two additional tasks in order to solve this bug? Is it at all necessary to check heartbeat status, or is it sufficient to check transport.is_closing()?

@Dreamsorcerer
Copy link
Member

The linked issue was closed, would be good if @bdraco or @Hanaasagi could confirm if this PR is no longer relevant.

@bdraco
Copy link
Member

bdraco commented Sep 10, 2024

The receive resolves immediately when we know the connection is dropped now

@bdraco
Copy link
Member

bdraco commented Sep 10, 2024

Very likely this issue is fixed. Please let us know if you still experience this behavior on 3.10.5 or later and we can reopen.

@bdraco bdraco closed this Sep 10, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bot:chronographer:provided There is a change note present in this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

WebSocketResponse does not throw/close when connection is abruptly cut
6 participants