Skip to content

Commit

Permalink
Fix possible deadlock with "complete()" and async sink (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
Delgan authored Sep 3, 2023
1 parent 2c585a1 commit 8c2044e
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

- Add a new ``context`` optional argument to ``logger.add()`` specifying ``multiprocessing`` context (like ``"spawn"`` or ``"fork"``) to be used internally instead of the default one (`#851 <https://github.com/Delgan/loguru/issues/851>`_).
- Add support for true colors on Windows using ANSI/VT console when available (`#934 <https://github.com/Delgan/loguru/issues/934>`_, thanks `@tunaflsh <https://github.com/tunaflsh>`_).
- Fix possible deadlock when calling ``logger.complete()`` with concurrent logging of an asynchronous sink (`#906 <https://github.com/Delgan/loguru/issues/906>`_).
- Fix file possibly rotating too early or too late when re-starting an application around midnight (`#894 <https://github.com/Delgan/loguru/issues/894>`_).
- Fix inverted ``"<hide>"`` and ``"<strike>"`` color tags (`#943 <https://github.com/Delgan/loguru/pull/943>`_, thanks `@tunaflsh <https://github.com/tunaflsh>`_).
- Fix possible untraceable errors raised when logging non-unpicklable ``Exception`` instances while using ``enqueue=True`` (`#329 <https://github.com/Delgan/loguru/issues/329>`_).
Expand Down
4 changes: 2 additions & 2 deletions loguru/_file_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ def stop(self):

self._terminate_file(is_rotating=False)

async def complete(self):
pass
def tasks_to_complete(self):
return []

def _create_path(self):
path = self._path.format_map({"time": FileDateFormatter()})
Expand Down
17 changes: 11 additions & 6 deletions loguru/_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
self._queue = None
self._queue_lock = None
self._confirmation_event = None
self._confirmation_lock = None
self._owner_process_pid = None
Expand All @@ -88,6 +89,7 @@ def __init__(

if self._enqueue:
self._queue = self._multiprocessing_context.SimpleQueue()
self._queue_lock = create_handler_lock()
self._confirmation_event = self._multiprocessing_context.Event()
self._confirmation_lock = self._multiprocessing_context.Lock()
self._owner_process_pid = os.getpid()
Expand Down Expand Up @@ -223,12 +225,12 @@ def complete_queue(self):
self._confirmation_event.wait()
self._confirmation_event.clear()

async def complete_async(self):
def tasks_to_complete(self):
if self._enqueue and self._owner_process_pid != os.getpid():
return

with self._protected_lock():
await self._sink.complete()
return []
lock = self._queue_lock if self._enqueue else self._protected_lock()
with lock:
return self._sink.tasks_to_complete()

def update_format(self, level_id):
if not self._colorize or self._is_formatter_dynamic:
Expand Down Expand Up @@ -285,7 +287,7 @@ def _queued_writer(self):

# We need to use a lock to protect sink during fork.
# Particularly, writing to stderr may lead to deadlock in child process.
lock = create_handler_lock()
lock = self._queue_lock

while True:
try:
Expand Down Expand Up @@ -317,12 +319,15 @@ def __getstate__(self):
state["_sink"] = None
state["_thread"] = None
state["_owner_process"] = None
state["_queue_lock"] = None
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._lock = create_handler_lock()
self._lock_acquired = threading.local()
if self._enqueue:
self._queue_lock = create_handler_lock()
if self._is_formatter_dynamic:
if self._colorize:
self._memoize_dynamic_format = memoize(prepare_colored_format)
Expand Down
10 changes: 4 additions & 6 deletions loguru/_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -1109,20 +1109,18 @@ def complete(self):
>>> process.join()
Message sent from the child
"""
tasks = []

with self._core.lock:
handlers = self._core.handlers.copy()
for handler in handlers.values():
handler.complete_queue()

logger = self
tasks.extend(handler.tasks_to_complete())

class AwaitableCompleter:
def __await__(self):
with logger._core.lock:
handlers = logger._core.handlers.copy()
for handler in handlers.values():
yield from handler.complete_async().__await__()
for task in tasks:
yield from task.__await__()

return AwaitableCompleter()

Expand Down
37 changes: 23 additions & 14 deletions loguru/_simple_sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ def stop(self):
if self._stoppable:
self._stream.stop()

async def complete(self):
if self._completable:
await self._stream.complete()
def tasks_to_complete(self):
if not self._completable:
return []
return [self._stream.complete()]


class StandardSink:
Expand Down Expand Up @@ -52,8 +53,8 @@ def write(self, message):
def stop(self):
self._handler.close()

async def complete(self):
pass
def tasks_to_complete(self):
return []


class AsyncSink:
Expand Down Expand Up @@ -86,14 +87,22 @@ def stop(self):
for task in self._tasks:
task.cancel()

async def complete(self):
def tasks_to_complete(self):
# To avoid errors due to "self._tasks" being mutated while iterated, the
# "tasks_to_complete()" method must be protected by the same lock as "write()" (which
# happens to be the handler lock). However, the tasks must not be awaited while the lock is
# acquired as this could lead to a deadlock. Therefore, we first need to collect the tasks
# to complete, then return them so that they can be awaited outside of the lock.
return [self._complete_task(task) for task in self._tasks]

async def _complete_task(self, task):
loop = get_running_loop()
for task in self._tasks:
if get_task_loop(task) is loop:
try:
await task
except Exception:
pass # Handled in "check_exception()"
if get_task_loop(task) is not loop:
return
try:
await task
except Exception:
pass # Handled in "check_exception()"

def __getstate__(self):
state = self.__dict__.copy()
Expand All @@ -115,5 +124,5 @@ def write(self, message):
def stop(self):
pass

async def complete(self):
pass
def tasks_to_complete(self):
return []
63 changes: 62 additions & 1 deletion tests/test_coroutine_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,67 @@ async def complete():
assert err == ""


def test_complete_and_sink_write_concurrency():
count = 1000
n = 0

async def sink(message):
nonlocal n
n += 1

async def some_task():
for _ in range(count):
logger.info("Message")
await asyncio.sleep(0)

async def another_task():
for _ in range(count):
await logger.complete()
await asyncio.sleep(0)

async def main():
logger.remove()
logger.add(sink, catch=False)

await asyncio.gather(some_task(), another_task())

asyncio.run(main())

assert n == count


def test_complete_and_contextualize_concurrency():
called = False

async def main():
logging_event = asyncio.Event()
contextualize_event = asyncio.Event()

async def sink(message):
nonlocal called
logging_event.set()
await contextualize_event.wait()
called = True

async def logging_task():
logger.info("Message")
await logger.complete()

async def contextualize_task():
with logger.contextualize():
contextualize_event.set()
await logging_event.wait()

logger.remove()
logger.add(sink, catch=False)

await asyncio.gather(logging_task(), contextualize_task())

asyncio.run(main())

assert called


async def async_subworker(logger_):
logger_.info("Child")
await logger_.complete()
Expand All @@ -588,7 +649,7 @@ async def write(self, message):
self.output += message


def test_complete_with_sub_processes(monkeypatch, capsys):
def test_complete_with_sub_processes(capsys):
spawn_context = multiprocessing.get_context("spawn")

loop = asyncio.new_event_loop()
Expand Down

0 comments on commit 8c2044e

Please sign in to comment.