Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace subscriptions with hooks in Continuous #70

Merged
merged 3 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 29 additions & 41 deletions nextline/continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,75 +3,63 @@
The non-interactive mode needs to be properly reimplemented in the trace
function.
'''
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, AsyncIterator, Set
from typing import TYPE_CHECKING, Any, AsyncIterator

from nextline.events import OnStartPrompt
from nextline.plugin.spec import Context, hookimpl
from nextline.utils.pubsub import PubSubItem

if TYPE_CHECKING:
from nextline import Nextline


class Continue:
def __init__(self, pubsub_enabled: PubSubItem[bool]) -> None:
self._pubsub_enabled = pubsub_enabled

@hookimpl
async def on_start_prompt(self, context: Context, event: OnStartPrompt) -> None:
await context.nextline.send_pdb_command(
command='continue',
prompt_no=event.prompt_no,
trace_no=event.trace_no,
)

@hookimpl
async def on_finished(self, context: Context) -> None:
context.nextline.unregister(plugin=self)
await self._pubsub_enabled.publish(False)


class Continuous:
def __init__(self, nextline: Nextline):
def __init__(self, nextline: 'Nextline'):
self._nextline = nextline

self._pubsub_enabled = PubSubItem[bool]()
self._tasks: Set[asyncio.Task] = set()

async def start(self) -> None:
await self._pubsub_enabled.publish(False)
self._task = asyncio.create_task(self._monitor_state())

async def close(self) -> None:
await self._task
if self._tasks:
await asyncio.gather(*self._tasks)
await self._pubsub_enabled.aclose()

async def __aenter__(self) -> 'Continuous':
await self.start()
return self

async def __aexit__(self, exc_type, exc_value, traceback): # type: ignore
del exc_type, exc_value, traceback
async def __aexit__(self, *_: Any, **__: Any) -> None:
await self.close()

async def _monitor_state(self) -> None:
async for state in self._nextline.subscribe_state():
if state == 'initialized' and self._tasks:
_, pending = await asyncio.wait(
self._tasks, timeout=0.001, return_when=asyncio.FIRST_COMPLETED
)
self._tasks.clear()
self._tasks.update(pending)

async def run_and_continue(self) -> None:
started = asyncio.Event()
task = asyncio.create_task(self._run_and_continue(started))
self._tasks.add(task)
await started.wait()
await self._pubsub_enabled.publish(True)
self._nextline.register(plugin=Continue(pubsub_enabled=self._pubsub_enabled))
await self._nextline.run()

async def run_continue_and_wait(self, started: asyncio.Event) -> None:
await self._run_and_continue(started)

async def _run_and_continue(self, started: asyncio.Event) -> None:
await self._pubsub_enabled.publish(True)
try:
async with self._nextline.run_session():
# started.set() # TODO: too early. need to investigate why
async for prompt in self._nextline.prompts():
await self._nextline.send_pdb_command(
command='continue',
prompt_no=prompt.prompt_no,
trace_no=prompt.trace_no,
)
started.set()
started.set() # ensure started is set even if no prompt is received
finally:
await self._pubsub_enabled.publish(False)
self._nextline.register(plugin=Continue(pubsub_enabled=self._pubsub_enabled))
async with self._nextline.run_session():
started.set()

@property
def enabled(self) -> bool:
Expand Down
24 changes: 22 additions & 2 deletions nextline/imp.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
if TYPE_CHECKING:
from .main import Nextline

Plugin = object


class Imp:
'''The interface to the finite state machine and the plugin hook.'''
Expand All @@ -27,8 +29,26 @@ def __init__(self, nextline: 'Nextline', init_options: InitOptions) -> None:
def __repr__(self) -> str:
return f'<{self.__class__.__name__} {self._machine!r}>'

def register(self, plugin: Any) -> str | None:
return self._hook.register(plugin)
def register(self, plugin: Plugin) -> str | None:
if (name := self._hook.register(plugin)) is not None:
msg = f'Plugin {name!r} registered to {self._hook.project_name!r} project'
self._logger.info(msg)
else:
msg = f'Plugin {plugin!r} failed to register to {self._hook.project_name!r} project'
self._logger.error(msg)
return name

def unregister(
self, plugin: Plugin | None = None, name: str | None = None
) -> Any | None:
if (p := self._hook.unregister(plugin=plugin, name=name)) is not None:
msg = f'Plugin {p!r} unregistered from {self._hook.project_name!r} project'
self._logger.info(msg)
else:
f = plugin if plugin is not None else name
msg = f'Plugin {f!r} failed to unregister from {self._hook.project_name!r} project'
self._logger.error(msg)
return p

@property
def state(self) -> str:
Expand Down
9 changes: 7 additions & 2 deletions nextline/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Any, Optional

from .continuous import Continuous
from .imp import Imp
from .imp import Imp, Plugin
from .spawned import PdbCommand
from .types import (
InitOptions,
Expand Down Expand Up @@ -76,9 +76,14 @@ def __repr__(self) -> str:
# e.g., "<Nextline 'running'>"
return f'<{self.__class__.__name__} {self.state!r}>'

def register(self, plugin: Any) -> str | None:
def register(self, plugin: Plugin) -> str | None:
return self._imp.register(plugin)

def unregister(
self, plugin: Plugin | None = None, name: str | None = None
) -> Any | None:
return self._imp.unregister(plugin=plugin, name=name)

async def start(self) -> None:
if self._started:
return
Expand Down