Skip to content

Commit

Permalink
Lookin sharp
Browse files Browse the repository at this point in the history
  • Loading branch information
tomchristie committed Feb 7, 2024
1 parent d246c91 commit 20f088d
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 38 deletions.
61 changes: 42 additions & 19 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation
from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

Expand Down Expand Up @@ -106,14 +106,21 @@ def __init__(
self._local_address = local_address
self._uds = uds

self._connections: List[AsyncConnectionInterface] = []
self._requests: List[AsyncPoolRequest] = []
self._pool_lock = AsyncLock()
self._network_backend = (
AutoBackend() if network_backend is None else network_backend
)
self._socket_options = socket_options

# The mutable state on a connection pool is the queue of incoming requests,
# and the set of connections that are servicing those requests.
self._connections: List[AsyncConnectionInterface] = []
self._requests: List[AsyncPoolRequest] = []

# We only mutate the state of the connection pool within an 'optional_thread_lock'
# context. This holds a threading lock unless we're running in async mode,
# in which case it is a no-op.
self._optional_thread_lock = AsyncThreadLock()

def create_connection(self, origin: Origin) -> AsyncConnectionInterface:
return AsyncHTTPConnection(
origin=origin,
Expand Down Expand Up @@ -165,31 +172,48 @@ async def handle_async_request(self, request: Request) -> Response:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)

pool_request = AsyncPoolRequest(request)
self._requests.append(pool_request)
with self._optional_thread_lock:
# Add the incoming request to our request queue.
pool_request = AsyncPoolRequest(request)
self._requests.append(pool_request)

try:
while True:
with AsyncShieldCancellation():
with self._optional_thread_lock:
# Assign incoming requests to available connections,
# closing or creating new connections as required.
closing = self._assign_requests_to_connections()
await self._close_connections(closing)

# Wait until this request has an assigned connection.
connection = await pool_request.wait_for_connection(timeout=timeout)

try:
# Send the request on the assigned connection.
response = await connection.handle_async_request(
pool_request.request
)
except ConnectionNotAvailable:
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
#
# In this case we clear the connection and try again.
pool_request.clear_connection()
else:
break

except BaseException as exc:
with AsyncShieldCancellation():
with self._optional_thread_lock:
# For any exception or cancellation we remove the request from
# the queue, and then re-assign requests to connections.
self._requests.remove(pool_request)
closing = self._assign_requests_to_connections()

await self._close_connections(closing)
raise exc from None

# Return the response. Note that in this case we still have to manage
# the point at which the response is closed.
assert isinstance(response.stream, AsyncIterable)
return Response(
status=response.status,
Expand Down Expand Up @@ -274,22 +298,20 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
return closing_connections

async def _close_connections(self, closing: List[AsyncConnectionInterface]) -> None:
"""
Close connections which have been removed from the pool.
"""
for connection in closing:
await connection.aclose()
# Close connections which have been removed from the pool.
with AsyncShieldCancellation():
for connection in closing:
await connection.aclose()

async def aclose(self) -> None:
closing_connections = list(self._connections)
self._connections = []
# Explicitly close the connection pool.
# Clears all existing requests and connections.
with self._optional_thread_lock:
closing_connections = list(self._connections)
self._connections = []
await self._close_connections(closing_connections)

async def __aenter__(self) -> "AsyncConnectionPool":
# Acquiring the pool lock here ensures that we have the
# correct dependencies installed as early as possible.
async with self._pool_lock:
pass
return self

async def __aexit__(
Expand Down Expand Up @@ -328,6 +350,7 @@ async def aclose(self) -> None:
if hasattr(self._stream, "aclose"):
await self._stream.aclose()

with self._pool._optional_thread_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()

Expand Down
61 changes: 42 additions & 19 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, Lock, ShieldCancellation
from .._synchronization import Event, ShieldCancellation, ThreadLock
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

Expand Down Expand Up @@ -106,14 +106,21 @@ def __init__(
self._local_address = local_address
self._uds = uds

self._connections: List[ConnectionInterface] = []
self._requests: List[PoolRequest] = []
self._pool_lock = Lock()
self._network_backend = (
SyncBackend() if network_backend is None else network_backend
)
self._socket_options = socket_options

# The mutable state on a connection pool is the queue of incoming requests,
# and the set of connections that are servicing those requests.
self._connections: List[ConnectionInterface] = []
self._requests: List[PoolRequest] = []

# We only mutate the state of the connection pool within an 'optional_thread_lock'
# context. This holds a threading lock unless we're running in async mode,
# in which case it is a no-op.
self._optional_thread_lock = ThreadLock()

def create_connection(self, origin: Origin) -> ConnectionInterface:
return HTTPConnection(
origin=origin,
Expand Down Expand Up @@ -165,31 +172,48 @@ def handle_request(self, request: Request) -> Response:
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)

pool_request = PoolRequest(request)
self._requests.append(pool_request)
with self._optional_thread_lock:
# Add the incoming request to our request queue.
pool_request = PoolRequest(request)
self._requests.append(pool_request)

try:
while True:
with ShieldCancellation():
with self._optional_thread_lock:
# Assign incoming requests to available connections,
# closing or creating new connections as required.
closing = self._assign_requests_to_connections()
self._close_connections(closing)

# Wait until this request has an assigned connection.
connection = pool_request.wait_for_connection(timeout=timeout)

try:
# Send the request on the assigned connection.
response = connection.handle_request(
pool_request.request
)
except ConnectionNotAvailable:
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
#
# In this case we clear the connection and try again.
pool_request.clear_connection()
else:
break

except BaseException as exc:
with ShieldCancellation():
with self._optional_thread_lock:
# For any exception or cancellation we remove the request from
# the queue, and then re-assign requests to connections.
self._requests.remove(pool_request)
closing = self._assign_requests_to_connections()

self._close_connections(closing)
raise exc from None

# Return the response. Note that in this case we still have to manage
# the point at which the response is closed.
assert isinstance(response.stream, Iterable)
return Response(
status=response.status,
Expand Down Expand Up @@ -274,22 +298,20 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
return closing_connections

def _close_connections(self, closing: List[ConnectionInterface]) -> None:
"""
Close connections which have been removed from the pool.
"""
for connection in closing:
connection.close()
# Close connections which have been removed from the pool.
with ShieldCancellation():
for connection in closing:
connection.close()

def close(self) -> None:
closing_connections = list(self._connections)
self._connections = []
# Explicitly close the connection pool.
# Clears all existing requests and connections.
with self._optional_thread_lock:
closing_connections = list(self._connections)
self._connections = []
self._close_connections(closing_connections)

def __enter__(self) -> "ConnectionPool":
# Acquiring the pool lock here ensures that we have the
# correct dependencies installed as early as possible.
with self._pool_lock:
pass
return self

def __exit__(
Expand Down Expand Up @@ -328,6 +350,7 @@ def close(self) -> None:
if hasattr(self._stream, "close"):
self._stream.close()

with self._pool._optional_thread_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()

Expand Down
58 changes: 58 additions & 0 deletions httpcore/_synchronization.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ def current_async_library() -> str:


class AsyncLock:
"""
This is a standard lock.
In the sync case `Lock` provides thread locking.
In the async case `AsyncLock` provides async locking.
"""

def __init__(self) -> None:
self._backend = ""

Expand Down Expand Up @@ -82,6 +89,26 @@ async def __aexit__(
self._anyio_lock.release()


class AsyncThreadLock:
"""
This is a threading-only lock for no-I/O contexts.
In the sync case `ThreadLock` provides thread locking.
In the async case `AsyncThreadLock` is a no-op.
"""

def __enter__(self) -> "AsyncThreadLock":
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_value: Optional[BaseException] = None,
traceback: Optional[TracebackType] = None,
) -> None:
pass


class AsyncEvent:
def __init__(self) -> None:
self._backend = ""
Expand Down Expand Up @@ -202,6 +229,13 @@ def __exit__(


class Lock:
"""
This is a standard lock.
In the sync case `Lock` provides thread locking.
In the async case `AsyncLock` provides async locking.
"""

def __init__(self) -> None:
self._lock = threading.Lock()

Expand All @@ -218,6 +252,30 @@ def __exit__(
self._lock.release()


class ThreadLock:
"""
This is a threading-only lock for no-I/O contexts.
In the sync case `ThreadLock` provides thread locking.
In the async case `AsyncThreadLock` is a no-op.
"""

def __init__(self) -> None:
self._lock = threading.Lock()

def __enter__(self) -> "ThreadLock":
self._lock.acquire()
return self

def __exit__(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_value: Optional[BaseException] = None,
traceback: Optional[TracebackType] = None,
) -> None:
self._lock.release()


class Event:
def __init__(self) -> None:
self._event = threading.Event()
Expand Down

0 comments on commit 20f088d

Please sign in to comment.