diff --git a/nextline/plugin/plugins/session/session.py b/nextline/plugin/plugins/session/session.py index f09177b9..a2dd85bf 100644 --- a/nextline/plugin/plugins/session/session.py +++ b/nextline/plugin/plugins/session/session.py @@ -1,7 +1,6 @@ import asyncio import contextlib import multiprocessing as mp -import time from collections.abc import AsyncIterator, Callable from functools import partial from logging import getLogger @@ -13,7 +12,7 @@ from nextline import spawned from nextline.plugin.spec import Context, hookimpl from nextline.spawned import Command, QueueIn, QueueOut, RunResult -from nextline.utils import run_in_process +from nextline.utils import Timer, run_in_process pickling_support.install() @@ -63,20 +62,25 @@ async def relay_events(context: Context, queue: QueueOut) -> AsyncIterator[None] '''Call the hook `on_event_in_process()` on events emitted in the spawned process.''' logger = getLogger(__name__) + in_finally = False + timer = Timer(timeout=1) # seconds + async def _monitor() -> None: while (event := await asyncio.to_thread(queue.get)) is not None: logger.debug(f'event: {event!r}') await context.hook.ahook.on_event_in_process(context=context, event=event) + if in_finally: + timer.restart() task = asyncio.create_task(_monitor()) try: yield finally: - up_to = 0.05 - start = time.process_time() + in_finally = True + timer.restart() while not queue.empty(): - await asyncio.sleep(0) - if time.process_time() - start > up_to: + await asyncio.sleep(0) # let _monitor() run + if timer.is_timeout(): logger.warning(f'Timeout. the queue is not empty: {queue!r}') break await asyncio.to_thread(queue.put, None) # type: ignore diff --git a/nextline/spawned/__init__.py b/nextline/spawned/__init__.py index 048a164a..5b31e3f6 100644 --- a/nextline/spawned/__init__.py +++ b/nextline/spawned/__init__.py @@ -25,6 +25,8 @@ import traceback +from nextline.utils import wait_until_queue_empty + from .commands import Command, PdbCommand from .runner import run from .types import QueueIn, QueueOut, RunArg, RunResult, Statement @@ -45,7 +47,9 @@ def main(run_arg: RunArg) -> RunResult: assert _queue_in assert _queue_out try: - return run(run_arg, _queue_in, _queue_out) + ret = run(run_arg, _queue_in, _queue_out) + wait_until_queue_empty(queue=_queue_out) + return ret except BaseException: traceback.print_exc() raise diff --git a/nextline/utils/__init__.py b/nextline/utils/__init__.py index 47bed802..14daecd0 100644 --- a/nextline/utils/__init__.py +++ b/nextline/utils/__init__.py @@ -13,11 +13,13 @@ 'profile_func', 'PubSub', 'PubSubItem', + 'wait_until_queue_empty', 'ExitedProcess', 'RunningProcess', 'run_in_process', 'ExcThread', 'ThreadTaskIdComposer', + 'Timer', ] from .done_callback import TaskDoneCallback, ThreadDoneCallback, ThreadTaskDoneCallback @@ -27,6 +29,8 @@ from .peek import peek_stderr, peek_stdout, peek_textio from .profile import profile_func from .pubsub import PubSub, PubSubItem +from .queue import wait_until_queue_empty from .run import ExitedProcess, RunningProcess, run_in_process from .thread_exception import ExcThread from .thread_task_id import ThreadTaskIdComposer +from .timer import Timer diff --git a/nextline/utils/queue.py b/nextline/utils/queue.py new file mode 100644 index 00000000..0affd87f --- /dev/null +++ b/nextline/utils/queue.py @@ -0,0 +1,31 @@ +import time +from queue import Queue +from typing import Optional + +from .timer import Timer + + +def wait_until_queue_empty( + queue: Queue, timeout: Optional[float] = None, interval: float = 0.001 +) -> None: + '''Wait until the queue is empty. + + Parameters + ---------- + queue : + The queue to wait for. + timeout : optional + The timeout in seconds. Wait indefinitely if None. + interval : float, optional + The interval in seconds to check the queue. + + ''' + if timeout is None: + while not queue.empty(): + time.sleep(interval) + else: + timer = Timer(timeout) + while not queue.empty(): + if timer.is_timeout(): + raise TimeoutError(f'Timeout. the queue is not empty: {queue!r}') + time.sleep(interval) diff --git a/nextline/utils/timer.py b/nextline/utils/timer.py new file mode 100644 index 00000000..509ea11b --- /dev/null +++ b/nextline/utils/timer.py @@ -0,0 +1,49 @@ +import time + + +class Timer: + '''A timer. + + Parameters + ---------- + timeout : + The timeout in seconds. + + + Examples + -------- + >>> timer = Timer(timeout=0.01) + >>> timer.elapsed() < 0.01 + True + + >>> timer.is_timeout() + False + + >>> timer.restart() + + >>> timer.is_timeout() + False + + >>> time.sleep(0.01) + + >>> timer.is_timeout() + True + + + ''' + + def __init__(self, timeout: float) -> None: + self._timeout = timeout + self._start = time.perf_counter() + + def restart(self) -> None: + '''Restart the timer.''' + self._start = time.perf_counter() + + def elapsed(self) -> float: + '''Return the time passed in seconds.''' + return time.perf_counter() - self._start + + def is_timeout(self) -> bool: + '''Return True if the timeout is reached.''' + return self.elapsed() > self._timeout