Skip to content

Commit

Permalink
Sync cached_property getter access
Browse files Browse the repository at this point in the history
Add an option to pass in a lock to the cached_property decorator,
so the accessor is run just once even when multiple tasks are
concurrently trying to await on the value.
  • Loading branch information
mjpieters committed Jul 23, 2024
1 parent 806e85e commit fa39a6b
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 85 deletions.
259 changes: 196 additions & 63 deletions asyncstdlib/functools.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
from asyncio import iscoroutinefunction
from typing import (
Callable,
Awaitable,
Union,
Any,
Generic,
AsyncContextManager,
Awaitable,
Callable,
Coroutine,
Generator,
Generic,
Optional,
Coroutine,
Type,
Union,
cast,
overload,
)

from ._typing import T, AC, AnyIterable
from ._core import ScopedIter, awaitify as _awaitify, Sentinel
from .builtins import anext
from ._utility import public_module

from ._core import ScopedIter, Sentinel
from ._core import awaitify as _awaitify
from ._lrucache import (
lru_cache,
CacheInfo,
CacheParameters,
LRUAsyncCallable,
LRUAsyncBoundCallable,
LRUAsyncCallable,
lru_cache,
)
from ._typing import AC, AnyIterable, R, T
from .builtins import anext
from .contextlib import nullcontext

__all__ = [
"cache",
Expand All @@ -32,6 +36,7 @@
"LRUAsyncBoundCallable",
"reduce",
"cached_property",
"CachedProperty",
]


Expand All @@ -45,44 +50,172 @@ def cache(user_function: AC) -> LRUAsyncCallable[AC]:
return lru_cache(maxsize=None)(user_function)


class AwaitableValue(Generic[T]):
class AwaitableValue(Generic[R]):
"""Helper to provide an arbitrary value in ``await``"""

__slots__ = ("value",)

def __init__(self, value: T):
def __init__(self, value: R):
self.value = value

# noinspection PyUnreachableCode
def __await__(self) -> Generator[None, None, T]:
def __await__(self) -> Generator[None, None, R]:
return self.value
yield # type: ignore # pragma: no cover

def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.value!r})"


class _RepeatableCoroutine(Generic[T]):
"""Helper to ``await`` a coroutine also more or less than just once"""
class _FutureCachedValue(Generic[R, T]):
"""A placeholder object to control concurrent access to a cached awaitable value.
When given a lock to coordinate access, only the first task to await on a
cached property triggers the underlying coroutine. Once a value has been
produced, all tasks are unblocked and given the same, single value.
"""

__slots__ = ("call", "args", "kwargs")
__slots__ = ("_get_attribute", "_instance", "_name", "_lock")

def __init__(
self, __call: Callable[..., Coroutine[Any, Any, T]], *args: Any, **kwargs: Any
self,
get_attribute: Callable[[T], Coroutine[Any, Any, R]],
instance: T,
name: str,
lock: AsyncContextManager[Any],
):
self.call = __call
self.args = args
self.kwargs = kwargs
self._get_attribute = get_attribute
self._instance = instance
self._name = name
self._lock = lock

def __await__(self) -> Generator[None, None, R]:
return self._await_impl().__await__()

def __await__(self) -> Generator[Any, Any, T]:
return self.call(*self.args, **self.kwargs).__await__()
async def _await_impl(self) -> R:
try:
stored = cast(Awaitable[R], self._instance.__dict__[self._name])
except KeyError:
# something deleted the cached value or future cached value placeholder. Restart
# the fetch by delegating to the cached_property descriptor.
return await cast(Awaitable[R], getattr(self._instance, self._name))

if stored is not self:
# another task produced a value
return await stored

# attempt to get the lock
async with self._lock:
# check again for a cached value
try:
stored = cast(Awaitable[R], self._instance.__dict__[self._name])
except KeyError:
# something deleted the cached value or future cached value placeholder. Restart
# the fetch by delegating to the cached_property descriptor.
return await cast(Awaitable[R], getattr(self._instance, self._name))

if stored is not self:
# another task produced a value
return await stored

# the instance attribute is still this placeholder, and we hold the lock. Start the getter
# to store the value on the instance and return the value.
return await self._get_attribute(self._instance)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} object {self.call.__name__} at {id(self)}>"
return (
f"<{type(self).__name__} for '{type(self._instance).__name__}."
f"{self._name}' at {id(self):#x}>"
)


class CachedProperty(Generic[T, R]):
def __init__(
self,
getter: Callable[[T], Awaitable[R]],
lock_type: Type[AsyncContextManager[Any]] = nullcontext,
):
self.func = getter
self.attrname = None
self.__doc__ = getter.__doc__
self._lock_type = lock_type

def __set_name__(self, owner: Any, name: str) -> None:
if self.attrname is None:
self.attrname = name
elif name != self.attrname:
raise TypeError(
"Cannot assign the same cached_property to two different names "
f"({self.attrname!r} and {name!r})."
)

@overload
def __get__(self, instance: None, owner: Type[Any]) -> "CachedProperty[T, R]": ...

@overload
def __get__(self, instance: T, owner: Optional[Type[Any]]) -> Awaitable[R]: ...

def __get__(
self, instance: Optional[T], owner: Optional[Type[Any]]
) -> Union["CachedProperty[T, R]", Awaitable[R]]:
if instance is None:
return self

name = self.attrname
if name is None:
raise TypeError(
"Cannot use cached_property instance without calling __set_name__ on it."
)

# check for write access first; not all objects have __dict__ (e.g. class defines slots)
try:
cache = instance.__dict__
except AttributeError:
msg = (
f"No '__dict__' attribute on {type(instance).__name__!r} "
f"instance to cache {name!r} property."
)
raise TypeError(msg) from None

# store a placeholder for other tasks to access the future cached value
# on this instance. It takes care of coordinating between different
# tasks awaiting on the placeholder until the cached value has been
# produced.
wrapper = _FutureCachedValue(
self._get_attribute, instance, name, self._lock_type()
)
cache[name] = wrapper
return wrapper

async def _get_attribute(self, instance: T) -> R:
value = await self.func(instance)
name = self.attrname
assert name is not None # enforced in __get__
instance.__dict__[name] = AwaitableValue(value)
return value


@overload
def cached_property(
lock_type_or_getter: Type[AsyncContextManager[Any]],
) -> Callable[[Callable[[T], Awaitable[R]]], CachedProperty[T, R]]: ...


@public_module(__name__, "cached_property")
class CachedProperty(Generic[T]):
@overload
def cached_property(
lock_type_or_getter: Callable[[T], Awaitable[R]],
) -> CachedProperty[T, R]: ...


def cached_property(
lock_type_or_getter: Union[
Type[AsyncContextManager[Any]], Callable[[T], Awaitable[R]]
],
) -> Union[
Callable[[Callable[[T], Awaitable[R]]], CachedProperty[T, R]],
CachedProperty[T, R],
]:
"""
Transform a method into an attribute whose value is cached
Expand All @@ -108,7 +241,7 @@ def __init__(self, url):
async def data(self):
return await asynclib.get(self.url)
resource = Resource(1, 3)
resource = Resource("http://example.com")
print(await resource.data) # needs some time...
print(await resource.data) # finishes instantly
del resource.data
Expand All @@ -117,51 +250,51 @@ async def data(self):
Unlike a :py:class:`property`, this type does not support
:py:meth:`~property.setter` or :py:meth:`~property.deleter`.
If the attribute is accessed by multiple tasks before a cached value has
been produced, the getter can be run more than once. The final cached value
is determined by the last getter coroutine to return. To enforce that the
getter is executed at most once, provide a ``lock`` type - e.g. the
:py:class:`asyncio.Lock` class in an :py:mod:`asyncio` application - and
access is automatically synchronised.
.. code-block:: python3
from asyncio import Lock, gather
class Resource:
def __init__(self, url):
self.url = url
@a.cached_property(Lock)
async def data(self):
return await asynclib.get(self.url)
resource = Resource("http://example.com")
print(*(await gather(resource.data, resource.data)))
.. note::
Instances on which a value is to be cached must have a
``__dict__`` attribute that is a mutable mapping.
"""
if isinstance(lock_type_or_getter, type) and issubclass(
lock_type_or_getter, AsyncContextManager
):

def __init__(self, getter: Callable[[Any], Awaitable[T]]):
self.__wrapped__ = getter
self._name = getter.__name__
self.__doc__ = getter.__doc__

def __set_name__(self, owner: Any, name: str) -> None:
# Check whether we can store anything on the instance
# Note that this is a failsafe, and might fail ugly.
# People who are clever enough to avoid this heuristic
# should also be clever enough to know the why and what.
if not any("__dict__" in dir(cls) for cls in owner.__mro__):
raise TypeError(
"'cached_property' requires '__dict__' "
f"on {owner.__name__!r} to store {name}"
def decorator(
coroutine: Callable[[T], Awaitable[R]],
) -> CachedProperty[T, R]:
return CachedProperty(
coroutine,
lock_type=cast(Type[AsyncContextManager[Any]], lock_type_or_getter),
)
self._name = name

@overload
def __get__(self, instance: None, owner: type) -> "CachedProperty[T]": ...

@overload
def __get__(self, instance: object, owner: Optional[type]) -> Awaitable[T]: ...

def __get__(
self, instance: Optional[object], owner: Optional[type]
) -> Union["CachedProperty[T]", Awaitable[T]]:
if instance is None:
return self
# __get__ may be called multiple times before it is first awaited to completion
# provide a placeholder that acts just like the final value does
return _RepeatableCoroutine(self._get_attribute, instance)

async def _get_attribute(self, instance: object) -> T:
value = await self.__wrapped__(instance)
instance.__dict__[self._name] = AwaitableValue(value)
return value
return decorator

if not iscoroutinefunction(lock_type_or_getter):
raise ValueError("cached_property can only be used with a coroutine function")

cached_property = CachedProperty
return CachedProperty(lock_type_or_getter)


__REDUCE_SENTINEL = Sentinel("<no default>")
Expand Down
35 changes: 27 additions & 8 deletions asyncstdlib/functools.pyi
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
from typing import Any, Awaitable, Callable, Generic, overload

from ._typing import T, T1, T2, AC, AnyIterable
from typing import Any, AsyncContextManager, Awaitable, Callable, Generic, overload

from ._lrucache import (
LRUAsyncCallable as LRUAsyncCallable,
LRUAsyncBoundCallable as LRUAsyncBoundCallable,
)
from ._lrucache import (
LRUAsyncCallable as LRUAsyncCallable,
)
from ._lrucache import (
lru_cache as lru_cache,
)
from ._typing import AC, T1, T2, AnyIterable, R, T

def cache(user_function: AC) -> LRUAsyncCallable[AC]: ...

class cached_property(Generic[T]):
def __init__(self, getter: Callable[[Any], Awaitable[T]]) -> None: ...
class CachedProperty(Generic[T, R]):
def __init__(
self,
getter: Callable[[T], Awaitable[R]],
lock_type: type[AsyncContextManager[Any]] = ...,
) -> None: ...
def __set_name__(self, owner: Any, name: str) -> None: ...
@overload
def __get__(self, instance: None, owner: type) -> "cached_property[T]": ...
def __get__(self, instance: None, owner: type[Any]) -> "CachedProperty[T, R]": ...
@overload
def __get__(self, instance: object, owner: type | None) -> Awaitable[T]: ...
def __get__(self, instance: T, owner: type | None) -> Awaitable[R]: ...
# __set__ is not defined at runtime, but you are allowed to replace the cached value
def __set__(self, instance: T, value: R) -> None: ... # type: ignore[misc] # pyright: ignore[reportGeneralTypeIssues]
# __del__ is not defined at runtime, but you are allowed to delete the cached value
def __del__(self, instance: T) -> None: ...

@overload
def cached_property(
lock_type_or_getter: Callable[[T], Awaitable[R]],
) -> CachedProperty[T, R]: ...
@overload
def cached_property(
lock_type_or_getter: type[AsyncContextManager[Any]],
) -> Callable[[Callable[[T], Awaitable[R]]], CachedProperty[T, R]]: ...
@overload
async def reduce(
function: Callable[[T1, T2], T1], iterable: AnyIterable[T2], initial: T1
Expand Down
2 changes: 1 addition & 1 deletion docs/source/api/builtins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Iterator transforming

.. versionadded:: 3.10.0

The ``strict`` parameter.
The ``lock`` decorator argument.

.. autofunction:: map(function: (T, ...) → (await) R, iterable: (async) iter T, ...)
:async-for: :R
Expand Down
Loading

0 comments on commit fa39a6b

Please sign in to comment.