Skip to content

Commit

Permalink
Connection pool work
Browse files Browse the repository at this point in the history
  • Loading branch information
tomchristie committed Feb 6, 2024
1 parent 6e3453e commit 3791d5e
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 82 deletions.
74 changes: 37 additions & 37 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncLock
from .._synchronization import AsyncEvent, AsyncLock, AsyncShieldCancellation
from .connection import AsyncHTTPConnection
from .interfaces import AsyncConnectionInterface, AsyncRequestInterface

Expand Down Expand Up @@ -193,9 +193,10 @@ async def handle_async_request(self, request: Request) -> Response:
pool_request = AsyncPoolRequest(request)
try:
while True:
async with self._pool_lock:
self._requests.append(pool_request)
closing = self._assign_requests_to_connections()
with AsyncShieldCancellation():
async with self._pool_lock:
self._requests.append(pool_request)
closing = self._assign_requests_to_connections()
await self._close_connections(closing)
connection = await pool_request.wait_for_connection(timeout=timeout)

Expand All @@ -209,9 +210,10 @@ async def handle_async_request(self, request: Request) -> Response:
break

except BaseException as exc:
async with self._pool_lock:
self._requests.remove(pool_request)
closing = self._assign_requests_to_connections()
with AsyncShieldCancellation():
async with self._pool_lock:
self._requests.remove(pool_request)
closing = self._assign_requests_to_connections()
await self._close_connections(closing)
raise exc from None

Expand All @@ -225,17 +227,6 @@ async def handle_async_request(self, request: Request) -> Response:
extensions=response.extensions,
)

async def _request_closed(self, request: AsyncPoolRequest) -> None:
"""
Once a request completes we remove it from the pool,
and determine if we can now assign any queued requests
to a connection.
"""
async with self._pool_lock:
self._requests.remove(request)
closing = self._assign_requests_to_connections()
await self._close_connections(closing)

def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
"""
Manage the state of the connection pool, assigning incoming
Expand All @@ -248,12 +239,20 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
"""
closing_connections = []

# Close any expired connections.
for connection in list(self._connections):
if connection.has_expired():
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle() and len(self._connections) > self._max_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [
Expand All @@ -266,6 +265,9 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]
if avilable_connections:
# log: "reusing existing connection"
connection = avilable_connections[0]
Expand All @@ -275,12 +277,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idle_connections = [
connection
for connection in self._connections
if connection.is_idle()
]
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
Expand All @@ -300,10 +297,9 @@ async def _close_connections(self, closing: List[AsyncConnectionInterface]) -> N
await connection.aclose()

async def aclose(self) -> None:
closing = list(self._connections)
self._requests = []
closing_connections = list(self._connections)
self._connections = []
await self._close_connections(closing)
await self._close_connections(closing_connections)

async def __aenter__(self) -> "AsyncConnectionPool":
# Acquiring the pool lock here ensures that we have the
Expand Down Expand Up @@ -331,19 +327,23 @@ def __init__(
self._stream = stream
self._pool_request = pool_request
self._pool = pool
self._closed = False
assert self._pool_request in self._pool._requests

async def __aiter__(self) -> AsyncIterator[bytes]:
try:
async for part in self._stream:
yield part
except BaseException:
async with self._pool._pool_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()
await self._pool._close_connections(closing)
await self.aclose()

async def aclose(self) -> None:
async with self._pool._pool_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()
await self._pool._close_connections(closing)
if not self._closed:
self._closed = True
with AsyncShieldCancellation():
if hasattr(self._stream, "aclose"):
await self._stream.aclose()
async with self._pool._pool_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()
await self._pool._close_connections(closing)
74 changes: 37 additions & 37 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._models import Origin, Request, Response
from .._synchronization import Event, Lock
from .._synchronization import Event, Lock, ShieldCancellation
from .connection import HTTPConnection
from .interfaces import ConnectionInterface, RequestInterface

Expand Down Expand Up @@ -193,9 +193,10 @@ def handle_request(self, request: Request) -> Response:
pool_request = PoolRequest(request)
try:
while True:
with self._pool_lock:
self._requests.append(pool_request)
closing = self._assign_requests_to_connections()
with ShieldCancellation():
with self._pool_lock:
self._requests.append(pool_request)
closing = self._assign_requests_to_connections()
self._close_connections(closing)
connection = pool_request.wait_for_connection(timeout=timeout)

Expand All @@ -209,9 +210,10 @@ def handle_request(self, request: Request) -> Response:
break

except BaseException as exc:
with self._pool_lock:
self._requests.remove(pool_request)
closing = self._assign_requests_to_connections()
with ShieldCancellation():
with self._pool_lock:
self._requests.remove(pool_request)
closing = self._assign_requests_to_connections()
self._close_connections(closing)
raise exc from None

Expand All @@ -225,17 +227,6 @@ def handle_request(self, request: Request) -> Response:
extensions=response.extensions,
)

def _request_closed(self, request: PoolRequest) -> None:
"""
Once a request completes we remove it from the pool,
and determine if we can now assign any queued requests
to a connection.
"""
with self._pool_lock:
self._requests.remove(request)
closing = self._assign_requests_to_connections()
self._close_connections(closing)

def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
"""
Manage the state of the connection pool, assigning incoming
Expand All @@ -248,12 +239,20 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
"""
closing_connections = []

# Close any expired connections.
for connection in list(self._connections):
if connection.has_expired():
if connection.is_closed():
# log: "removing closed connection"
self._connections.remove(connection)
elif connection.has_expired():
# log: "closing expired connection"
self._connections.remove(connection)
closing_connections.append(connection)
elif (
connection.is_idle() and len(self._connections) > self._max_connections
):
# log: "closing idle connection"
self._connections.remove(connection)
closing_connections.append(connection)

# Assign queued requests to connections.
queued_requests = [
Expand All @@ -266,6 +265,9 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]
idle_connections = [
connection for connection in self._connections if connection.is_idle()
]
if avilable_connections:
# log: "reusing existing connection"
connection = avilable_connections[0]
Expand All @@ -275,12 +277,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
idle_connections = [
connection
for connection in self._connections
if connection.is_idle()
]
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
Expand All @@ -300,10 +297,9 @@ def _close_connections(self, closing: List[ConnectionInterface]) -> None:
connection.close()

def close(self) -> None:
closing = list(self._connections)
self._requests = []
closing_connections = list(self._connections)
self._connections = []
self._close_connections(closing)
self._close_connections(closing_connections)

def __enter__(self) -> "ConnectionPool":
# Acquiring the pool lock here ensures that we have the
Expand Down Expand Up @@ -331,19 +327,23 @@ def __init__(
self._stream = stream
self._pool_request = pool_request
self._pool = pool
self._closed = False
assert self._pool_request in self._pool._requests

def __iter__(self) -> Iterator[bytes]:
try:
for part in self._stream:
yield part
except BaseException:
with self._pool._pool_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()
self._pool._close_connections(closing)
self.close()

def close(self) -> None:
with self._pool._pool_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()
self._pool._close_connections(closing)
if not self._closed:
self._closed = True
with ShieldCancellation():
if hasattr(self._stream, "close"):
self._stream.close()
with self._pool._pool_lock:
self._pool._requests.remove(self._pool_request)
closing = self._pool._assign_requests_to_connections()
self._pool._close_connections(closing)
13 changes: 9 additions & 4 deletions tests/_async/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ async def test_connection_pool_with_keepalive():
async with pool.stream("GET", "http://example.com/") as response:
info = [repr(c) for c in pool.connections]
assert info == [
"<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, ACTIVE, Request Count: 1]>",
"<AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 2]>",
"<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, ACTIVE, Request Count: 1]>",
]
await response.aread()

assert response.status == 200
assert response.content == b"Hello, world!"
info = [repr(c) for c in pool.connections]
assert info == [
"<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>",
"<AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 2]>",
"<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>",
]


Expand Down Expand Up @@ -205,11 +205,16 @@ async def test_connection_pool_with_http2_goaway():
http2=True,
)

def debug(*args, **kwargs):
print(*args, **kwargs)

async with httpcore.AsyncConnectionPool(
network_backend=network_backend,
) as pool:
# Sending an intial request, which once complete will return to the pool, IDLE.
response = await pool.request("GET", "https://example.com/")
response = await pool.request(
"GET", "https://example.com/", exensions={"trace": debug}
)
assert response.status == 200
assert response.content == b"Hello, world!"

Expand All @@ -225,8 +230,8 @@ async def test_connection_pool_with_http2_goaway():

info = [repr(c) for c in pool.connections]
assert info == [
"<AsyncHTTPConnection ['https://example.com:443', HTTP/2, IDLE, Request Count: 1]>",
"<AsyncHTTPConnection ['https://example.com:443', HTTP/2, CLOSED, Request Count: 1]>",
"<AsyncHTTPConnection ['https://example.com:443', HTTP/2, IDLE, Request Count: 1]>",
]


Expand Down
13 changes: 9 additions & 4 deletions tests/_sync/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,17 @@ def test_connection_pool_with_keepalive():
with pool.stream("GET", "http://example.com/") as response:
info = [repr(c) for c in pool.connections]
assert info == [
"<HTTPConnection ['http://example.com:80', HTTP/1.1, ACTIVE, Request Count: 1]>",
"<HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 2]>",
"<HTTPConnection ['http://example.com:80', HTTP/1.1, ACTIVE, Request Count: 1]>",
]
response.read()

assert response.status == 200
assert response.content == b"Hello, world!"
info = [repr(c) for c in pool.connections]
assert info == [
"<HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>",
"<HTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 2]>",
"<HTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>",
]


Expand Down Expand Up @@ -205,11 +205,16 @@ def test_connection_pool_with_http2_goaway():
http2=True,
)

def debug(*args, **kwargs):
print(*args, **kwargs)

with httpcore.ConnectionPool(
network_backend=network_backend,
) as pool:
# Sending an intial request, which once complete will return to the pool, IDLE.
response = pool.request("GET", "https://example.com/")
response = pool.request(
"GET", "https://example.com/", exensions={"trace": debug}
)
assert response.status == 200
assert response.content == b"Hello, world!"

Expand All @@ -225,8 +230,8 @@ def test_connection_pool_with_http2_goaway():

info = [repr(c) for c in pool.connections]
assert info == [
"<HTTPConnection ['https://example.com:443', HTTP/2, IDLE, Request Count: 1]>",
"<HTTPConnection ['https://example.com:443', HTTP/2, CLOSED, Request Count: 1]>",
"<HTTPConnection ['https://example.com:443', HTTP/2, IDLE, Request Count: 1]>",
]


Expand Down

0 comments on commit 3791d5e

Please sign in to comment.