Skip to content

Commit

Permalink
fix pooltimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
valsteen committed Oct 11, 2023
1 parent fc3e650 commit 0c49e79
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 9 deletions.
22 changes: 18 additions & 4 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation
from .._synchronization import (
AsyncEvent,
AsyncLock,
AsyncShieldCancellation,
sync_current_time,
)
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

Expand Down Expand Up @@ -220,15 +225,20 @@ async def handle_async_request(self, request: Request) -> Response:
)

status = RequestStatus(request)
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)

if timeout is not None:
deadline = sync_current_time() + timeout
else:
deadline = float("inf")

async with self._pool_lock:
self._requests.append(status)
await self._close_expired_connections()
await self._attempt_to_acquire_connection(status)

while True:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)
try:
connection = await status.wait_for_connection(timeout=timeout)
except BaseException as exc:
Expand Down Expand Up @@ -263,6 +273,10 @@ async def handle_async_request(self, request: Request) -> Response:
else:
break

timeout = deadline - sync_current_time()
if timeout < 0:
raise PoolTimeout

# When we return the response, we wrap the stream in a special class
# that handles notifying the connection pool once the response
# has been released.
Expand Down
22 changes: 18 additions & 4 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@

from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._exceptions import ConnectionNotAvailable, PoolTimeout, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, Lock, ShieldCancellation
from .._synchronization import (
Event,
Lock,
ShieldCancellation,
sync_current_time,
)
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

Expand Down Expand Up @@ -220,15 +225,20 @@ def handle_request(self, request: Request) -> Response:
)

status = RequestStatus(request)
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)

if timeout is not None:
deadline = sync_current_time() + timeout
else:
deadline = float("inf")

with self._pool_lock:
self._requests.append(status)
self._close_expired_connections()
self._attempt_to_acquire_connection(status)

while True:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)
try:
connection = status.wait_for_connection(timeout=timeout)
except BaseException as exc:
Expand Down Expand Up @@ -263,6 +273,10 @@ def handle_request(self, request: Request) -> Response:
else:
break

timeout = deadline - sync_current_time()
if timeout < 0:
raise PoolTimeout

# When we return the response, we wrap the stream in a special class
# that handles notifying the connection pool once the response
# has been released.
Expand Down
30 changes: 29 additions & 1 deletion httpcore/_synchronization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import threading
import time
from types import TracebackType
from typing import Optional, Type
from typing import Callable, Optional, Type, Union

from ._exceptions import ExceptionMapping, PoolTimeout, map_exceptions

Expand Down Expand Up @@ -291,3 +292,30 @@ def __exit__(
traceback: Optional[TracebackType] = None,
) -> None:
pass


_current_time_implementation: Union[None, Callable[[], float]] = None


def async_current_time() -> float:
global _current_time_implementation

if not _current_time_implementation:
if current_async_library() == "trio":
if trio is None: # pragma: nocover
raise RuntimeError(
"Running with trio requires installation of 'httpcore[trio]'."
)
_current_time_implementation = trio.current_time
else:
if anyio is None: # pragma: nocover
raise RuntimeError(
"Running with asyncio requires installation of 'httpcore[asyncio]'."
)
_current_time_implementation = anyio.current_time

return _current_time_implementation()


def sync_current_time() -> float:
return time.monotonic()

0 comments on commit 0c49e79

Please sign in to comment.