Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Maintenance for Test Loop #153

Merged
merged 12 commits into from
Oct 8, 2024
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
MIT License

Copyright (c) 2019 - 2020 Max Fischer
Copyright (c) 2019 - 2024 Max Fischer

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
2 changes: 1 addition & 1 deletion asyncstdlib/asynctools.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class _BorrowedAsyncIterator(AsyncGenerator[T, S]):
__slots__ = "__wrapped__", "__anext__", "asend", "athrow", "_wrapper"

# Type checker does not understand `__slot__` definitions
__anext__: Callable[[Any], Awaitable[T]]
__anext__: Callable[[Any], Coroutine[Any, Any, T]]
asend: Any
athrow: Any

Expand Down
19 changes: 8 additions & 11 deletions asyncstdlib/contextlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,18 +328,15 @@ def push(self, exit: SE) -> SE:
If a context manager must also be entered, use :py:meth:`~.enter_context`
instead.
"""
try:
aexit: Callable[..., Awaitable[Union[None, bool]]]
if hasattr(exit, "__aexit__"):
aexit = exit.__aexit__ # type: ignore
except AttributeError:
try:
aexit = awaitify(
exit.__exit__, # type: ignore
)
except AttributeError:
assert callable(
exit
), f"Expected (async) context manager or callable, got {exit}"
aexit = awaitify(exit)
elif hasattr(exit, "__exit__"):
aexit = awaitify(exit.__exit__) # type: ignore
elif callable(exit):
aexit = awaitify(exit) # type: ignore
else:
raise TypeError(f"Expected (async) context manager or callable, got {exit}")
self._exit_callbacks.append(aexit) # pyright: ignore[reportUnknownArgumentType]
return exit

Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

project = "asyncstdlib"
author = "Max Fischer"
copyright = f"2019 {author}"
copyright = f"2019-2024 {author}"

# The short X.Y version
version = __version__
Expand Down
5 changes: 3 additions & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ The missing ``async`` toolbox
:caption: Development & Maintenance
:hidden:

source/contributing
source/publishing
source/devel/contributing
source/devel/testloop
source/devel/publishing

The ``asyncstdlib`` library re-implements functions and classes of the Python
standard library to make them compatible with ``async`` callables, iterables
Expand Down
2 changes: 1 addition & 1 deletion docs/source/api/contextlib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Context Managers
An :term:`abstract base class` for asynchronous context managers

This class can be used to check whether some object is an
asynchronous context manager. If a class may inherit from
asynchronous context manager. A class may inherit from
``AbstractContextManager``, in which case it must implement
an ``__aenter__`` method; the default ``__aenter__`` returns
the asynchronous context manager itself.
Expand Down
File renamed without changes.
File renamed without changes.
34 changes: 34 additions & 0 deletions docs/source/devel/testloop.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
===============
Test Event Loop
===============

.. py:module:: unittests.utility
:synopsis: testing utilities

To facilitate event loop agnostic features, :py:mod:`asyncstdlib` includes
its own custom event loop implementation for testing.
This is provided as a simple decorator that is compatible with :py:mod:`pytest`,
as well as a number of `async` commands specific to the event loop.

Event Loops
===========

The test event loop is available via a decorator that should be directly applied
to an ``async def`` test case.

.. autofunction:: sync(test_case: (...) -> (await) None) -> (...) -> None

Async commands
==============

.. autoclass:: Schedule(*await Any)

.. py:class:: Switch(skip: int, /)
:no-index:

.. py:class:: Switch(min: int, max: int, /)
:no-index:

.. autoclass:: Switch()

.. autoclass:: Lock
5 changes: 2 additions & 3 deletions unittests/test_builtins.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import asyncstdlib as a

from .utility import sync, asyncify, awaitify, inside_loop
from .utility import sync, asyncify, awaitify


def hide_coroutine(corofunc):
Expand Down Expand Up @@ -83,8 +83,7 @@ async def __aiter__(self):
yield 1
finally:
nonlocal closed
if await inside_loop():
closed = True
closed = True

zip_iter = a.zip(asyncify(range(-5, 0)), SomeIterable())
async for va, vb in zip_iter:
Expand Down
10 changes: 5 additions & 5 deletions unittests/test_functools.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import asyncstdlib as a
from asyncstdlib.functools import CachedProperty

from .utility import Lock, Schedule, Switch, asyncify, multi_sync, sync
from .utility import Lock, Schedule, Switch, asyncify, sync


@sync
Expand Down Expand Up @@ -44,7 +44,7 @@ async def bar(self):
Foo().bar


@multi_sync
@sync
async def test_cache_property_order():
class Value:
def __init__(self, value):
Expand All @@ -66,7 +66,7 @@ async def check_increment(to):
assert (await val.cached) == 1337 # last value fetched


@multi_sync
@sync
async def test_cache_property_lock_order():
class Value:
def __init__(self, value):
Expand All @@ -87,7 +87,7 @@ async def check_cached(to, expected):
assert (await val.cached) == 5 # first value fetched


@multi_sync
@sync
async def test_cache_property_lock_deletion():
class Value:
def __init__(self, value):
Expand Down Expand Up @@ -300,7 +300,7 @@ async def pingpong(arg):


@pytest.mark.parametrize("size", [16, None])
@multi_sync
@sync
async def test_lru_cache_concurrent(size):
current = 0

Expand Down
12 changes: 5 additions & 7 deletions unittests/test_itertools.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import asyncstdlib as a

from .utility import sync, asyncify, awaitify, multi_sync, Schedule, Switch, Lock
from .utility import sync, asyncify, awaitify, Schedule, Switch, Lock


@sync
Expand Down Expand Up @@ -314,16 +314,15 @@ async def test_tee():
assert await a.list(iterator) == iterable


@multi_sync
@sync
async def test_tee_concurrent_locked():
"""Test that properly uses a lock for synchronisation"""
items = [1, 2, 3, -5, 12, 78, -1, 111]

async def iter_values():
for item in items:
# switch to other tasks a few times to guarantees another runs
for _ in range(5):
await Switch()
await Switch(5)
yield item

async def test_peer(peer_tee):
Expand All @@ -345,7 +344,7 @@ async def test_peer(peer_tee):
platform.python_implementation() != "CPython",
reason="async generators only protect against concurrent access on CPython",
)
@multi_sync
@sync
async def test_tee_concurrent_unlocked():
"""Test that tee does not prevent concurrency without a lock"""
items = list(range(12))
Expand All @@ -354,8 +353,7 @@ async def test_tee_concurrent_unlocked():
async def iter_values():
for item in items:
# switch to other tasks a few times to guarantee another runs
for _ in range(5):
await Switch()
await Switch(5)
yield item

async def test_peer(peer_tee):
Expand Down
108 changes: 58 additions & 50 deletions unittests/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,65 +11,48 @@
)
from functools import wraps
from collections import deque
from random import randint


T = TypeVar("T")


async def asyncify(iterable: Iterable[T]) -> AsyncIterator[T]:
"""Convert an iterable to async iterable"""
"""
Convert an iterable into an async iterable

This is intended to sequence literals like lists to `async` iterators
in order to force usage of `async` code paths. There is no functional
or other advantage otherwise.
"""
for value in iterable:
yield value


def awaitify(call: Callable[..., T]) -> Callable[..., Awaitable[T]]:
async def await_wrapper(*args, **kwargs):
return call(*args, **kwargs)

return await_wrapper


class PingPong:
"""Signal to the event loop which gets returned unchanged"""

def __await__(self):
return (yield self)


async def inside_loop():
"""Test whether there is an active event loop available"""
signal = PingPong()
return await signal is signal


def sync(test_case: Callable[..., Coroutine[T, Any, Any]]) -> Callable[..., T]:
"""
Mark an ``async def`` test case to be run synchronously
Convert a callable (`foo()`) into an async callable (`await foo()`)

This emulates a primitive "event loop" which only responds
to the :py:class:`PingPong` by sending it back.
This is intended to convert `lambda` expressions to `async` functions
in order to force usage of `async` code paths. There is no functional
or other advantage otherwise.
"""

@wraps(test_case)
def run_sync(*args: Any, **kwargs: Any) -> T:
coro = test_case(*args, **kwargs)
try:
event = None
while True:
event = coro.send(event)
if not isinstance(event, PingPong): # pragma: no cover
raise RuntimeError(
f"test case {test_case} yielded an unexpected event {event}"
)
except StopIteration as e:
result = e.args[0] if e.args else None
return result
async def await_wrapper(*args: Any, **kwargs: Any) -> T:
return call(*args, **kwargs)

return run_sync
return await_wrapper


class Schedule:
"""Signal to the event loop to adopt and run a new coroutine"""
r"""
Signal to the event loop to adopt and run new coroutines

:param coros: The coroutines to start running

In order to communicate with the event loop and start the coroutines,
the :py:class:`Schedule` must be `await`\ ed.
"""

def __init__(self, *coros: Coroutine[Any, Any, Any]):
self.coros = coros
Expand All @@ -79,13 +62,35 @@ def __await__(self):


class Switch:
"""Signal to the event loop to run another coroutine"""
"""
Signal to the event loop to run another coroutine

Pauses the coroutine but immediately continues after
all other runnable coroutines of the event loop.
This is similar to the common ``sleep(0)`` function
of regular event loop frameworks.

If a single argument is given, this specifies how many
turns should be skipped. The default corresponds to `0`.
If two arguments are given, this is interpreted as an
inclusive interval to randomly select the skip count.
"""

def __init__(self, skip: int = 0, limit: int = 0, /) -> None:
if limit <= 0:
self._idle_count = skip
else:
self._idle_count = randint(skip, limit)

def __await__(self):
yield self
for _ in range(self._idle_count):
yield self


class Lock:
"""Simple lock for exclusive access"""

def __init__(self):
self._owned = False
self._waiting: list[object] = []
Expand All @@ -95,24 +100,29 @@ async def __aenter__(self):
# wait until it is our turn to take the lock
token = object()
self._waiting.append(token)
# a spin-lock should be fine since tests are short anyways
while self._owned or self._waiting[0] is not token:
await Switch()
# take the lock and remove our wait claim
self._owned = True
# we will take the lock now, remove our wait claim
self._waiting.pop(0)
self._owned = True

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
self._owned = False


def multi_sync(test_case: Callable[..., Coroutine[T, Any, Any]]) -> Callable[..., T]:
def sync(test_case: Callable[..., Coroutine[None, Any, Any]], /) -> Callable[..., None]:
"""
Mark an ``async def`` test case to be run synchronously with children

This emulates a primitive "event loop" which only responds
to the :py:class:`PingPong`, :py:class:`Schedule`, :py:class:`Switch`
and :py:class:`Lock`.
This provides a primitive "event loop" which only responds
to :py:class:`Schedule`, :py:class:`Switch` and :py:class:`Lock`.

It should be applied as a decorator on an ``async def`` function, which
is then turned into a synchronous callable that will run the ``async def``
function and all tasks it spawns.
Other decorators, most prominently :py:func:`pytest.mark.parametrize`,
can be applied around it.
"""

@wraps(test_case)
Expand All @@ -127,9 +137,7 @@ def run_sync(*args: Any, **kwargs: Any):
result = e.args[0] if e.args else None
assert result is None, f"got '{result!r}' expected 'None'"
else:
if isinstance(event, PingPong):
run_queue.appendleft((coro, event))
elif isinstance(event, Schedule):
if isinstance(event, Schedule):
run_queue.extend((new_coro, None) for new_coro in event.coros)
run_queue.append((coro, event))
elif isinstance(event, Switch):
Expand Down
Loading