From 0c49e79cc22e3fdd5166c12bb248232c4a49d879 Mon Sep 17 00:00:00 2001 From: Vincent Alsteen Date: Wed, 11 Oct 2023 08:14:35 +0200 Subject: [PATCH] fix pooltimeout --- httpcore/_async/connection_pool.py | 22 ++++++++++++++++++---- httpcore/_sync/connection_pool.py | 22 ++++++++++++++++++---- httpcore/_synchronization.py | 30 +++++++++++++++++++++++++++++- 3 files changed, 65 insertions(+), 9 deletions(-) diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index ddc0510e6..b73efeb71 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -5,9 +5,14 @@ from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend -from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol +from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation +from .._synchronization import ( + AsyncEvent, + AsyncLock, + AsyncShieldCancellation, + sync_current_time, +) from .connection import AsyncHTTPConnection from .interfaces import AsyncConnectionInterface, AsyncRequestInterface @@ -220,6 +225,13 @@ async def handle_async_request(self, request: Request) -> Response: ) status = RequestStatus(request) + timeouts = request.extensions.get("timeout", {}) + timeout = timeouts.get("pool", None) + + if timeout is not None: + deadline = sync_current_time() + timeout + else: + deadline = float("inf") async with self._pool_lock: self._requests.append(status) @@ -227,8 +239,6 @@ async def handle_async_request(self, request: Request) -> Response: await self._attempt_to_acquire_connection(status) while True: - timeouts = request.extensions.get("timeout", {}) - timeout = timeouts.get("pool", None) try: connection = await status.wait_for_connection(timeout=timeout) except BaseException as exc: @@ -263,6 +273,10 @@ async def handle_async_request(self, request: Request) -> Response: else: break + timeout = deadline - sync_current_time() + if timeout < 0: + raise PoolTimeout + # When we return the response, we wrap the stream in a special class # that handles notifying the connection pool once the response # has been released. diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index dbcaff1fc..412d1f51e 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -5,9 +5,14 @@ from .._backends.sync import SyncBackend from .._backends.base import SOCKET_OPTION, NetworkBackend -from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol +from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol from .._models import Origin, Request, Response -from .._synchronization import Event, Lock, ShieldCancellation +from .._synchronization import ( + Event, + Lock, + ShieldCancellation, + sync_current_time, +) from .connection import HTTPConnection from .interfaces import ConnectionInterface, RequestInterface @@ -220,6 +225,13 @@ def handle_request(self, request: Request) -> Response: ) status = RequestStatus(request) + timeouts = request.extensions.get("timeout", {}) + timeout = timeouts.get("pool", None) + + if timeout is not None: + deadline = sync_current_time() + timeout + else: + deadline = float("inf") with self._pool_lock: self._requests.append(status) @@ -227,8 +239,6 @@ def handle_request(self, request: Request) -> Response: self._attempt_to_acquire_connection(status) while True: - timeouts = request.extensions.get("timeout", {}) - timeout = timeouts.get("pool", None) try: connection = status.wait_for_connection(timeout=timeout) except BaseException as exc: @@ -263,6 +273,10 @@ def handle_request(self, request: Request) -> Response: else: break + timeout = deadline - sync_current_time() + if timeout < 0: + raise PoolTimeout + # When we return the response, we wrap the stream in a special class # that handles notifying the connection pool once the response # has been released. diff --git a/httpcore/_synchronization.py b/httpcore/_synchronization.py index 58e06160a..c16d3fab0 100644 --- a/httpcore/_synchronization.py +++ b/httpcore/_synchronization.py @@ -1,6 +1,7 @@ import threading +import time from types import TracebackType -from typing import Optional, Type +from typing import Callable, Optional, Type, Union from ._exceptions import ExceptionMapping, PoolTimeout, map_exceptions @@ -291,3 +292,30 @@ def __exit__( traceback: Optional[TracebackType] = None, ) -> None: pass + + +_current_time_implementation: Union[None, Callable[[], float]] = None + + +def async_current_time() -> float: + global _current_time_implementation + + if not _current_time_implementation: + if current_async_library() == "trio": + if trio is None: # pragma: nocover + raise RuntimeError( + "Running with trio requires installation of 'httpcore[trio]'." + ) + _current_time_implementation = trio.current_time + else: + if anyio is None: # pragma: nocover + raise RuntimeError( + "Running with asyncio requires installation of 'httpcore[asyncio]'." + ) + _current_time_implementation = anyio.current_time + + return _current_time_implementation() + + +def sync_current_time() -> float: + return time.monotonic()