Skip to content

Commit

Permalink
Merge pull request #201 from aiokitchen/featuire/thread-pool-rework
Browse files Browse the repository at this point in the history
Rework thread pool
  • Loading branch information
mosquito authored Mar 11, 2024
2 parents b4acf57 + 5cd7ec6 commit 5d20793
Show file tree
Hide file tree
Showing 19 changed files with 594 additions and 368 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ jobs:

matrix:
python:
- '3.7'
- '3.8'
- '3.9'
- '3.10'
- '3.11'
- '3.12'
steps:
- uses: actions/checkout@v2
- name: Setup python${{ matrix.python }}
Expand Down Expand Up @@ -95,8 +95,8 @@ jobs:

matrix:
python:
- '3.9'
- '3.10'
- '3.11'
steps:
- uses: actions/checkout@v2
- name: Setup python${{ matrix.python }}
Expand Down Expand Up @@ -147,7 +147,7 @@ jobs:
- name: Setup python3.10
uses: actions/setup-python@v2
with:
python-version: "3.10"
python-version: "3.11"
- run: python -m pip install poetry
- run: poetry install --with=uvloop -vvvv
- run: poetry run python -m pip install msgspec~=0.9.1
Expand Down
7 changes: 7 additions & 0 deletions aiomisc/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ def time_ns() -> int:
from typing_extensions import final # type: ignore


try:
from typing import TypeAlias
except ImportError:
from typing_extensions import TypeAlias


if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
Expand Down Expand Up @@ -118,6 +124,7 @@ def sock_set_reuseport(sock: socket.socket, reuse_port: bool) -> None:
"EventLoopMixin",
"ParamSpec",
"Protocol",
"TypeAlias",
"entry_pont_iterator",
"event_loop_policy",
"final",
Expand Down
9 changes: 4 additions & 5 deletions aiomisc/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -409,11 +409,10 @@ def _on_interrupt_callback(self, _: Any) -> None:
self.loop.call_soon_threadsafe(self._on_interrupt, loop)

def _on_interrupt(self, loop: asyncio.AbstractEventLoop) -> None:
async def shutdown() -> None:
log.warning("Interrupt signal received, shutting down...")
await self._stop(RuntimeError("Interrupt signal received"))

task = loop.create_task(shutdown())
log.warning("Interrupt signal received, shutting down...")
task = loop.create_task(
self._stop(RuntimeError("Interrupt signal received")),
)
handle = loop.call_later(self.shutdown_timeout, task.cancel)

def on_shutdown_finish(task: asyncio.Future) -> None:
Expand Down
49 changes: 39 additions & 10 deletions aiomisc/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
from functools import partial, total_ordering
from pathlib import Path
from typing import (
IO, Any, AnyStr, Awaitable, Callable, Generator, Generic, List, Optional,
TextIO, TypeVar, Union,
IO, Any, AnyStr, Awaitable, Callable, Generator, Generic, List, Literal,
Optional, TextIO, TypeVar, Union, overload,
)

from .compat import EventLoopMixin
from .compat import EventLoopMixin, TypeAlias


T = TypeVar("T", bound=Any)
FilePath: TypeAlias = Union[str, Path]


def proxy_method_async(
Expand Down Expand Up @@ -232,7 +233,7 @@ async def writelines(self, lines: List[AnyStr]) -> None:
await self.__execute_in_thread(self.fp.writelines, lines)


AsyncFileIOBase = AsyncFileIO
AsyncFileIOBase: TypeAlias = AsyncFileIO


class AsyncBinaryIO(AsyncFileIO[bytes]):
Expand Down Expand Up @@ -265,8 +266,8 @@ def buffer(self) -> AsyncBinaryIO:


# Aliases
AsyncBytesFileIO = AsyncBinaryIO
AsyncTextFileIO = AsyncTextIO
AsyncBytesFileIO: TypeAlias = AsyncBinaryIO
AsyncTextFileIO: TypeAlias = AsyncTextIO


class AsyncGzipBinaryIO(AsyncBytesFileIO):
Expand Down Expand Up @@ -312,14 +313,42 @@ class Compression(Enum):
LZMA = (AsyncLzmaBinaryIO, AsyncLzmaTextIO)


AsyncFileType = Union[AsyncFileIO[AnyStr], AsyncTextIO, AsyncBinaryIO]
AsyncFileType: TypeAlias = Union[
AsyncFileIO[AnyStr], AsyncTextIO, AsyncBinaryIO,
]

# Deprecated excluded from __all__
AsyncFileT = AsyncFileType
BinaryModes: TypeAlias = Literal[
"rb", "wb", "ab", "xb", "rb+", "wb+", "ab+", "xb+", "br", "br+",
"bw+", "ba+", "bx+",
]

TextModes: TypeAlias = Literal["r", "w", "a", "x", "r+", "w+", "a+", "x+"]


@overload
def async_open(
fname: FilePath,
mode: BinaryModes,
compression: Compression = Compression.NONE,
encoding: str = sys.getdefaultencoding(),
*args: Any, **kwargs: Any,
) -> AsyncBinaryIO:
...


@overload
def async_open(
fname: FilePath,
mode: TextModes,
compression: Compression = Compression.NONE,
encoding: str = sys.getdefaultencoding(),
*args: Any, **kwargs: Any,
) -> AsyncTextIO:
...


def async_open(
fname: Union[str, Path], mode: str = "r",
fname: FilePath, mode: str = "r",
compression: Compression = Compression.NONE,
encoding: str = sys.getdefaultencoding(),
*args: Any, **kwargs: Any,
Expand Down
6 changes: 3 additions & 3 deletions aiomisc/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def __init__(self, maxsize: int = 10, recycle: Optional[int] = None):
def __create_task(self, coro: Coroutine[Any, Any, Any]) -> asyncio.Task:
task = self.loop.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
task.add_done_callback(self._tasks.discard)
return task

async def __recycler(self) -> NoReturn:
Expand Down Expand Up @@ -126,7 +126,7 @@ def __recycle_instance(self, instance: Any) -> None:
self._recycle_times.pop(instance)

if instance in self._used:
self._used.remove(instance)
self._used.discard(instance)

self._recycle_bin.put_nowait(instance)

Expand Down Expand Up @@ -162,7 +162,7 @@ async def __acquire(self) -> T:
return instance

async def __release(self, instance: Any) -> None:
self._used.remove(instance)
self._used.discard(instance)

if self._recycle and self._recycle_times[instance] < self.loop.time():
self.__recycle_instance(instance)
Expand Down
10 changes: 7 additions & 3 deletions aiomisc/service/aiohttp.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import socket
import ssl
from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union

from aiohttp.web import Application, AppRunner, BaseRunner, SockSite # noqa

from aiomisc.service.tls import PathOrStr, get_ssl_context
from aiomisc.service.tls import PathOrStr, SSLOptions

from ..utils import bind_socket
from .base import Service
Expand Down Expand Up @@ -118,7 +119,10 @@ def __init__(
shutdown_timeout=shutdown_timeout, **kwds,
)

self.__ssl_options = cert, key, ca, verify, require_client_cert
self.__ssl_options = SSLOptions(
cert, key, ca, verify, require_client_cert,
ssl.Purpose.CLIENT_AUTH,
)

async def create_site(self) -> SockSite:
assert self.runner and self.socket
Expand All @@ -127,6 +131,6 @@ async def create_site(self) -> SockSite:
self.runner, self.socket,
shutdown_timeout=self.shutdown_timeout,
ssl_context=await self.loop.run_in_executor(
None, get_ssl_context, *self.__ssl_options,
None, self.__ssl_options.create_context,
),
)
2 changes: 1 addition & 1 deletion aiomisc/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def __init__(self, **kwargs: Any):
def create_task(self, coro: CoroutineType) -> asyncio.Task:
task: asyncio.Task = self.loop.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.remove)
task.add_done_callback(self.tasks.discard)
return task

async def stop(self, exc: Optional[Exception] = None) -> None:
Expand Down
4 changes: 2 additions & 2 deletions aiomisc/service/raven.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ def _async_send(

task = self._loop.create_task(coro)
self._tasks.add(task)
task.add_done_callback(self._tasks.remove)
task.add_done_callback(self._tasks.discard)

async def _close(self) -> None:
await asyncio.gather(
Expand All @@ -234,7 +234,7 @@ def __init__(
for _ in range(workers):
worker: asyncio.Task = self._loop.create_task(self._worker())
self._workers.add(worker)
worker.add_done_callback(self._workers.remove)
worker.add_done_callback(self._workers.discard)

async def _worker(self) -> None:
while True:
Expand Down
73 changes: 47 additions & 26 deletions aiomisc/service/tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import socket
import ssl
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import partial
from pathlib import Path
from typing import Any, Optional, Tuple, Union
Expand All @@ -16,30 +17,49 @@
log = logging.getLogger(__name__)


def get_ssl_context(
cert: str, key: str, ca: Optional[str], verify: bool,
require_client_cert: bool, purpose: ssl.Purpose,
) -> ssl.SSLContext:
cert, key, ca = map(str, (cert, key, ca))
@dataclass(frozen=True)
class SSLOptionsBase:
cert: Optional[Path]
key: Optional[Path]
ca: Optional[Path]
verify: bool
require_client_cert: bool
purpose: ssl.Purpose

context = ssl.create_default_context(purpose=purpose, cafile=ca)

if ca and not Path(ca).exists():
raise FileNotFoundError("CA file doesn't exists")

if require_client_cert:
context.verify_mode = ssl.VerifyMode.CERT_REQUIRED
class SSLOptions(SSLOptionsBase):
def __init__(
self, cert: Optional[PathOrStr], key: Optional[PathOrStr],
ca: Optional[PathOrStr], verify: bool, require_client_cert: bool,
purpose: ssl.Purpose,
) -> None:
super().__init__(
cert=Path(cert) if cert else None,
key=Path(key) if key else None,
ca=Path(ca) if ca else None,
verify=verify,
require_client_cert=require_client_cert,
purpose=purpose,
)

if key:
context.load_cert_chain(
cert,
key,
def create_context(self) -> ssl.SSLContext:
context = ssl.create_default_context(
purpose=self.purpose, cafile=self.ca,
)

if not verify:
context.check_hostname = False
if self.ca and not self.ca.exists():
raise FileNotFoundError("CA file doesn't exists")

if self.require_client_cert:
context.verify_mode = ssl.VerifyMode.CERT_REQUIRED

return context
if self.key and self.cert:
context.load_cert_chain(self.cert, self.key)

if not self.verify:
context.check_hostname = False

return context


class TLSServer(SimpleServer):
Expand All @@ -53,7 +73,7 @@ def __init__(
**kwargs: Any,
):

self.__ssl_options = (
self.__ssl_options = SSLOptions(
cert, key, ca, verify, require_client_cert,
ssl.Purpose.CLIENT_AUTH,
)
Expand Down Expand Up @@ -100,7 +120,7 @@ async def handle_client(

async def start(self) -> None:
ssl_context = await self.loop.run_in_executor(
None, get_ssl_context, *self.__ssl_options,
None, self.__ssl_options.create_context,
)

self.socket = self.make_socket()
Expand Down Expand Up @@ -148,7 +168,7 @@ def __init__(
ca: Optional[PathOrStr] = None, verify: bool = True,
**kwargs: Any,
) -> None:
self.__ssl_options = (
self.__ssl_options = SSLOptions(
cert, key, ca, verify, False, ssl.Purpose.SERVER_AUTH,
)

Expand All @@ -160,11 +180,12 @@ async def connect(self) -> Tuple[
last_error: Optional[Exception] = None
for _ in range(self.connect_attempts):
try:
ssl_context: ssl.SSLContext = await self.loop.run_in_executor(
None, self.__ssl_options.create_context,
)
reader, writer = await asyncio.open_connection(
self.address, self.port,
ssl=await self.loop.run_in_executor(
None, get_ssl_context, *self.__ssl_options,
),
ssl=ssl_context,
)
log.info(
"Connected to %s://%s:%d",
Expand All @@ -190,7 +211,7 @@ def __init__(
ca: Optional[PathOrStr] = None, verify: bool = True,
**kwargs: Any,
):
self.__ssl_options = (
self.__ssl_options = SSLOptions(
cert, key, ca, verify, False, ssl.Purpose.SERVER_AUTH,
)

Expand All @@ -205,7 +226,7 @@ async def connect(self) -> Tuple[
reader, writer = await asyncio.open_connection(
self.address, self.port,
ssl=await self.loop.run_in_executor(
None, get_ssl_context, *self.__ssl_options,
None, self.__ssl_options.create_context,
),
)
log.info(
Expand Down
2 changes: 1 addition & 1 deletion aiomisc/service/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async def start(self) -> None:
raise RuntimeError

self._transport, self._protocol = (
await self.loop.create_datagram_endpoint( # type: ignore
await self.loop.create_datagram_endpoint(
lambda: UDPServer.UDPSimpleProtocol(
self.handle_datagram,
self.create_task,
Expand Down
2 changes: 1 addition & 1 deletion aiomisc/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def disconnect(self, receiver: ReceiverType) -> None:
receiver,
)

self._receivers.remove(receiver) # type: ignore
self._receivers.discard(receiver) # type: ignore

async def call(self, *args: Any, **kwargs: Any) -> None:
for receiver in self._receivers:
Expand Down
Loading

0 comments on commit 5d20793

Please sign in to comment.