Skip to content

Commit

Permalink
Fix: fix the receive stuck for long time even if heartbeat is failed.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hanaasagi committed Jul 7, 2021
1 parent 5b23426 commit e8fe339
Showing 1 changed file with 23 additions and 1 deletion.
24 changes: 23 additions & 1 deletion aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ def __init__(
self._compress = compress
self._client_notakeover = client_notakeover

# A flag to indicate whether the latest heartbeat failed.
self._is_heartbeat_failed = asyncio.Event()
self._reset_heartbeat()

def _cancel_heartbeat(self) -> None:
Expand All @@ -91,6 +93,7 @@ def _cancel_heartbeat(self) -> None:

def _reset_heartbeat(self) -> None:
self._cancel_heartbeat()
self._is_heartbeat_failed.clear()

if self._heartbeat is not None:
self._heartbeat_cb = call_later(
Expand All @@ -116,6 +119,7 @@ def _pong_not_received(self) -> None:
self._close_code = WSCloseCode.ABNORMAL_CLOSURE
self._exception = asyncio.TimeoutError()
self._response.close()
self._is_heartbeat_failed.set()

@property
def closed(self) -> bool:
Expand Down Expand Up @@ -235,11 +239,29 @@ async def receive(self, timeout: Optional[float] = None) -> WSMessage:

try:
self._waiting = self._loop.create_future()

read_task = asyncio.create_task(self._reader.read())
is_heartbeat_failed_task = asyncio.create_task(
self._is_heartbeat_failed.wait()
)
try:
async with async_timeout.timeout(
timeout or self._timeout.ws_receive
):
msg = await self._reader.read()
# Check the heartbeat status when waiting data from server
done, pending = await asyncio.wait(
[read_task, is_heartbeat_failed_task],
return_when=asyncio.FIRST_COMPLETED,
)
# If server doesn't pong, but return data normally,
# supress the exception.
if read_task in done:
is_heartbeat_failed_task.cancel()
msg = read_task.result()
elif is_heartbeat_failed_task in done:
read_task.cancel()
raise self._exception

self._reset_heartbeat()
finally:
waiter = self._waiting
Expand Down

0 comments on commit e8fe339

Please sign in to comment.