Skip to content

Commit

Permalink
asyncio: avoid registering done_callback cleanup when there's nothing…
Browse files Browse the repository at this point in the history
… to cleanup

avoids closure on Future, which can take a long time to cleanup in starved event loops
  • Loading branch information
minrk committed Feb 16, 2024
1 parent ecf6db5 commit 96c1b8e
Showing 1 changed file with 31 additions and 19 deletions.
50 changes: 31 additions & 19 deletions zmq/_future.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import warnings
from asyncio import Future
from collections import deque
from functools import partial
from itertools import chain
from typing import (
Any,
Expand Down Expand Up @@ -460,20 +461,22 @@ def _call_later(self, delay, callback):
return self._get_loop().call_later(delay, callback)

@staticmethod
def _remove_finished_future(future, event_list):
def _remove_finished_future(future, event_list, event=None):
"""Make sure that futures are removed from the event list when they resolve
Avoids delaying cleanup until the next send/recv event,
which may never come.
"""
for f_idx, event in enumerate(event_list):
if event.future is future:
break
else:
return

# "future" instance is shared between sockets, but each socket has its own event list.
event_list.remove(event_list[f_idx])
if not event_list:
return
# only unconsumed events (e.g. cancelled calls)
# will be present when this happens
try:
event_list.remove(event)
except ValueError:
# usually this will have been removed by being consumed
return

def _add_recv_event(self, kind, kwargs=None, future=None):
"""Add a recv event, returning the corresponding Future"""
Expand All @@ -497,17 +500,23 @@ def _add_recv_event(self, kind, kwargs=None, future=None):

# we add it to the list of futures before we add the timeout as the
# timeout will remove the future from recv_futures to avoid leaks
self._recv_futures.append(_FutureEvent(f, kind, kwargs, msg=None, timer=timer))

# Don't let the Future sit in _recv_events after it's done
f.add_done_callback(
lambda f: self._remove_finished_future(f, self._recv_futures)
)
_future_event = _FutureEvent(f, kind, kwargs, msg=None, timer=timer)
self._recv_futures.append(_future_event)

if self._shadow_sock.get(EVENTS) & POLLIN:
# recv immediately, if we can
self._handle_recv()
if self._recv_futures:
if self._recv_futures and _future_event in self._recv_futures:
# Don't let the Future sit in _recv_events after it's done
# no need to register this if we've already been handled
# (i.e. immediately-resolved recv)
f.add_done_callback(
partial(
self._remove_finished_future,
event_list=self._recv_futures,
event=_future_event,
)
)
self._add_io_state(POLLIN)
return f

Expand Down Expand Up @@ -556,12 +565,15 @@ def _add_send_event(self, kind, msg=None, kwargs=None, future=None):

# we add it to the list of futures before we add the timeout as the
# timeout will remove the future from recv_futures to avoid leaks
self._send_futures.append(
_FutureEvent(f, kind, kwargs=kwargs, msg=msg, timer=timer)
)
_future_event = _FutureEvent(f, kind, kwargs=kwargs, msg=msg, timer=timer)
self._send_futures.append(_future_event)
# Don't let the Future sit in _send_futures after it's done
f.add_done_callback(
lambda f: self._remove_finished_future(f, self._send_futures)
partial(
self._remove_finished_future,
event_list=self._send_futures,
event=_future_event,
)
)

self._add_io_state(POLLOUT)
Expand Down

0 comments on commit 96c1b8e

Please sign in to comment.