Skip to content

Commit

Permalink
Split aiomisc-pytest as a separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
mosquito committed Feb 15, 2023
1 parent 038cfe7 commit 7171929
Show file tree
Hide file tree
Showing 37 changed files with 817 additions and 1,835 deletions.
3 changes: 2 additions & 1 deletion aiomisc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from .circuit_breaker import CircuitBreaker, CircuitBroken, cutout
from .context import Context, get_context
from .counters import Statistic, get_statistics
from .entrypoint import entrypoint, run
from .entrypoint import Entrypoint, entrypoint, run
from .iterator_wrapper import IteratorWrapper
from .periodic import PeriodicCallback
from .plugins import plugins
Expand Down Expand Up @@ -33,6 +33,7 @@
"CircuitBreaker",
"CircuitBroken",
"Context",
"Entrypoint",
"IteratorWrapper",
"IteratorWrapperSeparate",
"PeriodicCallback",
Expand Down
29 changes: 20 additions & 9 deletions aiomisc/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import socket
from contextvars import ContextVar
from typing import Optional
from typing import Any, Generic, Optional, TypeVar


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -58,18 +58,29 @@ def sock_set_reuseport(sock: socket.socket, reuse_port: bool) -> None:
)


EVENT_LOOP: ContextVar = ContextVar("EVENT_LOOP")
T = TypeVar("T", bound=Any)


def get_current_loop() -> asyncio.AbstractEventLoop:
loop: Optional[asyncio.AbstractEventLoop] = EVENT_LOOP.get(None)
if loop is None:
raise RuntimeError("no current event loop is set")
return loop
class StrictContextVar(Generic[T]):
def __init__(self, name: str, exc: Exception):
self.exc: Exception = exc
self.context_var: ContextVar = ContextVar(name)

def get(self) -> T:
value: Optional[Any] = self.context_var.get()
if value is None:
raise self.exc
return value

def set_current_loop(loop: asyncio.AbstractEventLoop) -> None:
EVENT_LOOP.set(loop)
def set(self, value: T) -> None:
self.context_var.set(value)


EVENT_LOOP: StrictContextVar[asyncio.AbstractEventLoop] = StrictContextVar(
"EVENT_LOOP", RuntimeError("no current event loop is set"),
)
get_current_loop = EVENT_LOOP.get
set_current_loop = EVENT_LOOP.set


__all__ = (
Expand Down
61 changes: 48 additions & 13 deletions aiomisc/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import os
from concurrent.futures import Executor
from typing import (
Any, Callable, Coroutine, MutableSet, Optional, TypeVar, Union,
Any, Callable, Coroutine, Iterable, MutableSet, Optional, TypeVar, Union,
final,
)
from weakref import WeakSet

import aiomisc_log
from aiomisc_log import LogLevel

from .compat import event_loop_policy, set_current_loop
from .compat import StrictContextVar, event_loop_policy, set_current_loop
from .context import Context, get_context
from .log import LogFormat, basic_config
from .service import Service
Expand Down Expand Up @@ -39,6 +40,7 @@ def _get_env_convert(name: str, converter: Callable[..., T], default: T) -> T:
return converter(value)


@final
class Entrypoint:
DEFAULT_LOG_LEVEL: str = os.getenv(
"AIOMISC_LOG_LEVEL", LogLevel.default(),
Expand All @@ -64,11 +66,20 @@ class Entrypoint:
"AIOMISC_POOL_SIZE", int, None,
)

CURRENT: StrictContextVar["Entrypoint"] = StrictContextVar(
"CURRENT_ENTRYPOINT",
RuntimeError("no current event loop is set"),
)

PRE_START = Signal()
POST_STOP = Signal()
POST_START = Signal()
PRE_STOP = Signal()

@classmethod
def get_current_entrypoint(cls) -> "Entrypoint":
return cls.CURRENT.get()

async def _start(self) -> None:
if self.log_config:
basic_config(
Expand All @@ -81,20 +92,18 @@ async def _start(self) -> None:
)

set_current_loop(self.loop)
self.CURRENT.set(self)

for signal in (
self.pre_start, self.post_stop,
self.pre_stop, self.post_start,
):
signal.freeze()

await self.pre_start.call(entrypoint=self, services=self.services)

await asyncio.gather(
*[self._start_service(svc) for svc in self.services],
)

await self.post_start.call(entrypoint=self, services=self.services)
services = self.services
await self.pre_start.call(entrypoint=self, services=services)
await asyncio.gather(*[self._start_service(svc) for svc in services])
await self.post_start.call(entrypoint=self, services=services)

def __init__(
self, *services: Service,
Expand All @@ -110,9 +119,9 @@ def __init__(
debug: bool = DEFAULT_AIOMISC_DEBUG,
):

"""
""" Creates a new Entrypoint
:param debug: set debug to event loop
:param debug: set debug to event-loop
:param loop: loop
:param services: Service instances which will be starting.
:param pool_size: thread pool size
Expand All @@ -139,12 +148,13 @@ def __init__(
self.log_level = log_level
self.policy = policy
self.pool_size = pool_size
self.services = services
self.__services = set(services)
self.shutting_down = False
self.pre_start = self.PRE_START.copy()
self.post_start = self.POST_START.copy()
self.pre_stop = self.PRE_STOP.copy()
self.post_stop = self.POST_STOP.copy()
self._lock = asyncio.Lock()

if self.log_config:
aiomisc_log.basic_config(
Expand All @@ -155,6 +165,10 @@ def __init__(
if self._loop is not None:
set_current_loop(self._loop)

@property
def services(self) -> Iterable[Service]:
return tuple(self.__services)

async def closing(self) -> None:
# Lazy initialization because event loop might be not exists
if self._closing is None:
Expand Down Expand Up @@ -260,6 +274,27 @@ async def _start_service(

return None

async def start_services(self, *svc: Service) -> None:
await self.pre_start.call(entrypoint=self, services=svc)
try:
await asyncio.gather(*[self._start_service(s) for s in svc])
finally:
await self.post_start.call(entrypoint=self, services=svc)
for s in svc:
self.__services.add(s)

async def stop_services(
self, *svc: Service, exc: Optional[Exception] = None,
) -> None:
await self.pre_stop.call(entrypoint=self, services=svc)

try:
await asyncio.gather(*[s.stop(exc) for s in svc])
finally:
for s in svc:
self.__services.remove(s)
await self.post_stop.call(entrypoint=self, services=svc)

async def _cancel_background_tasks(self) -> None:
tasks = asyncio_all_tasks(self._loop)
current_task = asyncio_current_task(self.loop)
Expand All @@ -270,7 +305,7 @@ async def graceful_shutdown(self, exception: Exception) -> None:
self._closing.set()

tasks = []
for svc in self.services:
for svc in self.__services:
try:
coro = svc.stop(exception)
except TypeError as e:
Expand Down
4 changes: 2 additions & 2 deletions aiomisc/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
# BY: poem-plugins "git" plugin
# NEVER EDIT THIS FILE MANUALLY

version_info = (16, 3, 5)
__version__ = "16.3.5"
version_info = (16, 3, 6)
__version__ = "16.3.6"
15 changes: 10 additions & 5 deletions aiomisc_log/enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@
def check_journal_stream() -> bool:
return False

try:
import rich

RICH_INSTALLED = bool(rich)
except ImportError:
RICH_INSTALLED = False


@unique
class LogFormat(IntEnum):
Expand All @@ -35,12 +42,10 @@ def default(cls) -> str:
if not os.isatty(sys.stderr.fileno()):
return cls.plain.name

try:
import rich # noqa

if RICH_INSTALLED:
return cls.rich.name
except ImportError:
return cls.color.name

return cls.color.name


class LogLevel(IntEnum):
Expand Down
Empty file removed aiomisc_pytest/__init__.py
Empty file.
Loading

0 comments on commit 7171929

Please sign in to comment.