-
-
Notifications
You must be signed in to change notification settings - Fork 758
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve ProxyHeadersMiddleware #1611
Changes from all commits
25a21c3
be3fc3f
6947d19
58b629e
77dffc7
b3ee098
7c81716
d26d14c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,8 @@ | |
|
||
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers#Proxies | ||
""" | ||
from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast | ||
import ipaddress | ||
from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union, cast | ||
|
||
if TYPE_CHECKING: | ||
from asgiref.typing import ( | ||
|
@@ -21,30 +22,62 @@ | |
) | ||
|
||
|
||
def _parse_raw_hosts(value: str) -> List[str]: | ||
return [item.strip() for item in value.split(",")] | ||
|
||
|
||
class _TrustedHosts: | ||
def __init__(self, trusted_hosts: Union[List[str], str]) -> None: | ||
self.trusted_networks: Set[ipaddress.IPv4Network] = set() | ||
self.trusted_literals: Set[str] = set() | ||
self.always_trust = trusted_hosts == "*" | ||
|
||
if not self.always_trust: | ||
if isinstance(trusted_hosts, str): | ||
trusted_hosts = _parse_raw_hosts(trusted_hosts) | ||
for host in trusted_hosts: | ||
try: | ||
# Try parsing the trusted host as an IPv4Network | ||
# to allow checking a whole range. | ||
# https://github.com/encode/uvicorn/issues/1068 | ||
self.trusted_networks.add(ipaddress.IPv4Network(host)) | ||
except ValueError: | ||
self.trusted_literals.add(host) | ||
Comment on lines
+44
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have a test case for this? |
||
|
||
def __contains__(self, item: Optional[str]) -> bool: | ||
if self.always_trust: | ||
return True | ||
|
||
try: | ||
ip = ipaddress.IPv4Address(item) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason we're using |
||
return any(ip in net for net in self.trusted_networks) | ||
except ValueError: | ||
return item in self.trusted_literals | ||
|
||
def get_trusted_client_host(self, x_forwarded_for: str) -> Optional[str]: | ||
x_forwarded_for_hosts = _parse_raw_hosts(x_forwarded_for) | ||
if self.always_trust: | ||
return x_forwarded_for_hosts[0] | ||
|
||
host = None | ||
for host in reversed(x_forwarded_for_hosts): | ||
if host not in self: | ||
return host | ||
# The request came from a client on the proxy itself. Trust it. | ||
# See https://github.com/encode/uvicorn/issues/1068#issuecomment-855371576 | ||
if host in self: | ||
return x_forwarded_for_hosts[0] | ||
return host | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be reached? What's the scenario? |
||
|
||
|
||
class ProxyHeadersMiddleware: | ||
def __init__( | ||
self, | ||
app: "ASGI3Application", | ||
trusted_hosts: Union[List[str], str] = "127.0.0.1", | ||
) -> None: | ||
self.app = app | ||
if isinstance(trusted_hosts, str): | ||
self.trusted_hosts = {item.strip() for item in trusted_hosts.split(",")} | ||
else: | ||
self.trusted_hosts = set(trusted_hosts) | ||
self.always_trust = "*" in self.trusted_hosts | ||
|
||
def get_trusted_client_host( | ||
self, x_forwarded_for_hosts: List[str] | ||
) -> Optional[str]: | ||
if self.always_trust: | ||
return x_forwarded_for_hosts[0] | ||
|
||
for host in reversed(x_forwarded_for_hosts): | ||
if host not in self.trusted_hosts: | ||
return host | ||
|
||
return None | ||
self.trusted_hosts = _TrustedHosts(trusted_hosts) | ||
|
||
async def __call__( | ||
self, scope: "Scope", receive: "ASGIReceiveCallable", send: "ASGISendCallable" | ||
|
@@ -54,7 +87,7 @@ async def __call__( | |
client_addr: Optional[Tuple[str, int]] = scope.get("client") | ||
client_host = client_addr[0] if client_addr else None | ||
|
||
if self.always_trust or client_host in self.trusted_hosts: | ||
if client_host in self.trusted_hosts: | ||
headers = dict(scope["headers"]) | ||
|
||
if b"x-forwarded-proto" in headers: | ||
|
@@ -68,11 +101,13 @@ async def __call__( | |
# X-Forwarded-For header. We've lost the connecting client's port | ||
# information by now, so only include the host. | ||
x_forwarded_for = headers[b"x-forwarded-for"].decode("latin1") | ||
x_forwarded_for_hosts = [ | ||
item.strip() for item in x_forwarded_for.split(",") | ||
] | ||
host = self.get_trusted_client_host(x_forwarded_for_hosts) | ||
port = 0 | ||
scope["client"] = (host, port) # type: ignore[arg-type] | ||
host = self.trusted_hosts.get_trusted_client_host(x_forwarded_for) | ||
|
||
# Host is None or an empty string | ||
# if the x-forwarded-for header is empty. | ||
# See https://github.com/encode/uvicorn/issues/1068 | ||
if host: | ||
port = 0 | ||
scope["client"] = (host, port) # type: ignore[arg-type] | ||
|
||
return await self.app(scope, receive, send) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for using
IPv4Network
instead ofipaddress.ip_network
which is version agnostic?