Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Let the spawned process exit after the output queue is empty #28

Merged
merged 5 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions nextline/plugin/plugins/session/session.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import contextlib
import multiprocessing as mp
import time
from collections.abc import AsyncIterator, Callable
from functools import partial
from logging import getLogger
Expand All @@ -13,7 +12,7 @@
from nextline import spawned
from nextline.plugin.spec import Context, hookimpl
from nextline.spawned import Command, QueueIn, QueueOut, RunResult
from nextline.utils import run_in_process
from nextline.utils import Timer, run_in_process

pickling_support.install()

Expand Down Expand Up @@ -63,20 +62,25 @@
'''Call the hook `on_event_in_process()` on events emitted in the spawned process.'''
logger = getLogger(__name__)

in_finally = False
timer = Timer(timeout=1) # seconds

async def _monitor() -> None:
while (event := await asyncio.to_thread(queue.get)) is not None:
logger.debug(f'event: {event!r}')
await context.hook.ahook.on_event_in_process(context=context, event=event)
if in_finally:
timer.restart()

Check warning on line 73 in nextline/plugin/plugins/session/session.py

View check run for this annotation

Codecov / codecov/patch

nextline/plugin/plugins/session/session.py#L73

Added line #L73 was not covered by tests

task = asyncio.create_task(_monitor())
try:
yield
finally:
up_to = 0.05
start = time.process_time()
in_finally = True
timer.restart()
while not queue.empty():
await asyncio.sleep(0)
if time.process_time() - start > up_to:
await asyncio.sleep(0) # let _monitor() run

Check warning on line 82 in nextline/plugin/plugins/session/session.py

View check run for this annotation

Codecov / codecov/patch

nextline/plugin/plugins/session/session.py#L82

Added line #L82 was not covered by tests
if timer.is_timeout():
logger.warning(f'Timeout. the queue is not empty: {queue!r}')
break
await asyncio.to_thread(queue.put, None) # type: ignore
Expand Down
6 changes: 5 additions & 1 deletion nextline/spawned/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import traceback

from nextline.utils import wait_until_queue_empty

from .commands import Command, PdbCommand
from .runner import run
from .types import QueueIn, QueueOut, RunArg, RunResult, Statement
Expand All @@ -45,7 +47,9 @@
assert _queue_in
assert _queue_out
try:
return run(run_arg, _queue_in, _queue_out)
ret = run(run_arg, _queue_in, _queue_out)
wait_until_queue_empty(queue=_queue_out)
return ret

Check warning on line 52 in nextline/spawned/__init__.py

View check run for this annotation

Codecov / codecov/patch

nextline/spawned/__init__.py#L51-L52

Added lines #L51 - L52 were not covered by tests
except BaseException:
traceback.print_exc()
raise
4 changes: 4 additions & 0 deletions nextline/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
'profile_func',
'PubSub',
'PubSubItem',
'wait_until_queue_empty',
'ExitedProcess',
'RunningProcess',
'run_in_process',
'ExcThread',
'ThreadTaskIdComposer',
'Timer',
]

from .done_callback import TaskDoneCallback, ThreadDoneCallback, ThreadTaskDoneCallback
Expand All @@ -27,6 +29,8 @@
from .peek import peek_stderr, peek_stdout, peek_textio
from .profile import profile_func
from .pubsub import PubSub, PubSubItem
from .queue import wait_until_queue_empty
from .run import ExitedProcess, RunningProcess, run_in_process
from .thread_exception import ExcThread
from .thread_task_id import ThreadTaskIdComposer
from .timer import Timer
31 changes: 31 additions & 0 deletions nextline/utils/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import time
from queue import Queue
from typing import Optional

from .timer import Timer


def wait_until_queue_empty(
queue: Queue, timeout: Optional[float] = None, interval: float = 0.001
) -> None:
'''Wait until the queue is empty.

Parameters
----------
queue :
The queue to wait for.
timeout : optional
The timeout in seconds. Wait indefinitely if None.
interval : float, optional
The interval in seconds to check the queue.

'''
if timeout is None:
while not queue.empty():
time.sleep(interval)
else:
timer = Timer(timeout)

Check warning on line 27 in nextline/utils/queue.py

View check run for this annotation

Codecov / codecov/patch

nextline/utils/queue.py#L27

Added line #L27 was not covered by tests
while not queue.empty():
if timer.is_timeout():
raise TimeoutError(f'Timeout. the queue is not empty: {queue!r}')
time.sleep(interval)

Check warning on line 31 in nextline/utils/queue.py

View check run for this annotation

Codecov / codecov/patch

nextline/utils/queue.py#L30-L31

Added lines #L30 - L31 were not covered by tests
49 changes: 49 additions & 0 deletions nextline/utils/timer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import time


class Timer:
'''A timer.

Parameters
----------
timeout :
The timeout in seconds.


Examples
--------
>>> timer = Timer(timeout=0.01)
>>> timer.elapsed() < 0.01
True

>>> timer.is_timeout()
False

>>> timer.restart()

>>> timer.is_timeout()
False

>>> time.sleep(0.01)

>>> timer.is_timeout()
True


'''

def __init__(self, timeout: float) -> None:
self._timeout = timeout
self._start = time.perf_counter()

def restart(self) -> None:
'''Restart the timer.'''
self._start = time.perf_counter()

def elapsed(self) -> float:
'''Return the time passed in seconds.'''
return time.perf_counter() - self._start

def is_timeout(self) -> bool:
'''Return True if the timeout is reached.'''
return self.elapsed() > self._timeout
Loading