Skip to content

Commit

Permalink
Make OutStream._buffer thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
Bago Amirbekian committed Mar 25, 2022
1 parent d6744f9 commit 2de496a
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,8 @@ def __init__(
self._flush_pending = False
self._subprocess_flush_pending = False
self._io_loop = pub_thread.io_loop
self._new_buffer()
self._buffer_lock = threading.RLock()
self._buffer = StringIO()
self.echo = None
self._isatty = bool(isatty)

Expand Down Expand Up @@ -528,7 +529,8 @@ def write(self, string: str) -> int:

is_child = (not self._is_master_process())
# only touch the buffer in the IO thread to avoid races
self.pub_thread.schedule(lambda: self._buffer.write(string))
with self._buffer_lock:
self._buffer.write(string)
if is_child:
# mp.Pool cannot be trusted to flush promptly (or ever),
# and this helps.
Expand All @@ -553,17 +555,15 @@ def writable(self):
return True

def _flush_buffer(self):
"""clear the current buffer and return the current buffer data.
This should only be called in the IO thread.
"""
data = ''
if self._buffer is not None:
buf = self._buffer
self._new_buffer()
data = buf.getvalue()
buf.close()
"""clear the current buffer and return the current buffer data."""
buf = self._rotate_buffer()
data = buf.getvalue()
buf.close()
return data

def _new_buffer(self):
self._buffer = StringIO()
def _rotate_buffer(self):
"""Returns the current buffer and replaces it with an empty buffer."""
with self._buffer_lock:
old_buffer = self._buffer
self._buffer = StringIO()
return old_buffer

0 comments on commit 2de496a

Please sign in to comment.