Skip to content

Commit

Permalink
Improve scheduling of write tasks on a locked socket (#6052)
Browse files Browse the repository at this point in the history
* Improve scheduling of write tasks on a locked socket

* Add write lock
  • Loading branch information
philippjfr committed Dec 15, 2023
1 parent 47b65fd commit f639c2e
Showing 1 changed file with 55 additions and 34 deletions.
89 changes: 55 additions & 34 deletions panel/io/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
ModelChangedEvent
)

WRITE_TASKS = []
WRITE_LOCK = asyncio.Lock()

@dataclasses.dataclass
class Request:
headers : dict
Expand All @@ -67,6 +70,9 @@ def destroyed(self) -> bool:
def request(self):
return Request(headers={}, cookies={}, arguments={})

def _cleanup_task(task):
if task in WRITE_TASKS:
WRITE_TASKS.remove(task)

def _dispatch_events(doc: Document, events: List[DocumentChangedEvent]) -> None:
"""
Expand Down Expand Up @@ -119,6 +125,50 @@ def _cleanup_doc(doc, destroy=True):
# Destroy document
doc.destroy(None)

async def _run_write_futures(futures):
"""
Ensure that all write_message calls are awaited and handled.
"""
from tornado.websocket import WebSocketClosedError
async with WRITE_LOCK:
for future in futures:
try:
await future
except WebSocketClosedError:
logger.warning("Failed sending message as connection was closed")
except Exception as e:
logger.warning(f"Failed sending message due to following error: {e}")

def _dispatch_write_task(doc, func, *args, **kwargs):
"""
Schedules tasks that write messages to the socket.
"""
try:
task = asyncio.ensure_future(func(*args, **kwargs))
WRITE_TASKS.append(task)
task.add_done_callback(_cleanup_task)
except RuntimeError:
doc.add_next_tick_callback(partial(func, *args, **kwargs))

async def _dispatch_msgs(doc, msgs):
"""
Writes messages to a socket, ensuring that the write_lock is not
set, otherwise re-schedules the write task on the event loop.
"""
from tornado.websocket import WebSocketHandler
remaining = {}
for conn, msg in msgs.items():
socket = conn._socket
if hasattr(socket, 'write_lock') and socket.write_lock._block._value == 0:
remaining[conn] = msg
continue
if isinstance(conn._socket, WebSocketHandler):
futures = dispatch_tornado(conn, msg=msg)
else:
futures = dispatch_django(conn, msg=msg)
await _run_write_futures(futures)
_dispatch_write_task(doc, _dispatch_msgs, doc, remaining)

#---------------------------------------------------------------------
# Public API
#---------------------------------------------------------------------
Expand Down Expand Up @@ -225,21 +275,6 @@ def dispatch_django(conn, events=None, msg=None):
])
return futures

async def _dispatch_msgs(msgs):
from tornado.websocket import WebSocketClosedError, WebSocketHandler
for conn, msg in msgs.items():
if isinstance(conn._socket, WebSocketHandler):
futures = dispatch_tornado(conn, msg=msg)
else:
futures = dispatch_django(conn, msg=msg)
for future in futures:
try:
await future
except WebSocketClosedError:
logger.warning("Failed sending message as connection was closed")
except Exception as e:
logger.warning(f"Failed sending message due to following error: {e}")

@contextmanager
def unlocked() -> Iterator:
"""
Expand All @@ -258,7 +293,7 @@ def unlocked() -> Iterator:
monkeypatch_events(curdoc.callbacks._held_events)
return

from tornado.websocket import WebSocketClosedError, WebSocketHandler
from tornado.websocket import WebSocketHandler
connections = session._subscribed_connections

curdoc.hold()
Expand Down Expand Up @@ -290,24 +325,11 @@ def unlocked() -> Iterator:
else:
futures += dispatch_django(conn, dispatch_events)

# Ensure that all write_message calls are awaited and handled
async def handle_write_errors():
for future in futures:
try:
await future
except WebSocketClosedError:
logger.warning("Failed sending message as connection was closed")
except Exception as e:
logger.warning(f"Failed sending message due to following error: {e}")

if futures:
if state._unblocked(curdoc):
try:
asyncio.ensure_future(handle_write_errors())
except RuntimeError:
curdoc.add_next_tick_callback(handle_write_errors)
_dispatch_write_task(curdoc, _run_write_futures, futures)
else:
curdoc.add_next_tick_callback(handle_write_errors)
curdoc.add_next_tick_callback(partial(_run_write_futures, futures))

curdoc.callbacks._held_events = remaining_events
finally:
Expand All @@ -322,9 +344,8 @@ async def handle_write_errors():
if not remaining_events:
continue
# Create a protocol message for any events that cannot be immediately dispatched
msgs[conn] = conn.protocol.create('PATCH-DOC', remaining_events)
curdoc.add_next_tick_callback(partial(_dispatch_msgs, msgs))

msgs[conn] = conn.protocol.create('PATCH-DOC', remaining_events)
_dispatch_write_task(curdoc, _dispatch_msgs, curdoc, msgs)

@contextmanager
def immediate_dispatch(doc: Document | None = None):
Expand Down

0 comments on commit f639c2e

Please sign in to comment.