Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add search for a trusted host in ProxyHeadersMiddleware #591

Merged
merged 4 commits into from
Mar 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 57 additions & 12 deletions tests/middleware/test_proxy_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,79 @@ async def app(scope, receive, send):
await response(scope, receive, send)


app = ProxyHeadersMiddleware(app, trusted_hosts="*")


@pytest.mark.asyncio
async def test_proxy_headers():
async with httpx.AsyncClient(app=app, base_url="http://testserver") as client:
@pytest.mark.parametrize(
("trusted_hosts", "response_text"),
[
# always trust
("*", "Remote: https://1.2.3.4:0"),
# trusted proxy
("127.0.0.1", "Remote: https://1.2.3.4:0"),
(["127.0.0.1"], "Remote: https://1.2.3.4:0"),
# trusted proxy list
(["127.0.0.1", "10.0.0.1"], "Remote: https://1.2.3.4:0"),
("127.0.0.1, 10.0.0.1", "Remote: https://1.2.3.4:0"),
# request from untrusted proxy
("192.168.0.1", "Remote: http://127.0.0.1:123"),
],
)
async def test_proxy_headers_trusted_hosts(trusted_hosts, response_text):
app_with_middleware = ProxyHeadersMiddleware(app, trusted_hosts=trusted_hosts)
async with httpx.AsyncClient(
app=app_with_middleware, base_url="http://testserver"
) as client:
headers = {"X-Forwarded-Proto": "https", "X-Forwarded-For": "1.2.3.4"}
response = await client.get("/", headers=headers)

assert response.status_code == 200
assert response.text == "Remote: https://1.2.3.4:0"
assert response.text == response_text


@pytest.mark.asyncio
async def test_proxy_headers_no_port():
async with httpx.AsyncClient(app=app, base_url="http://testserver") as client:
headers = {"X-Forwarded-Proto": "https", "X-Forwarded-For": "1.2.3.4"}
@pytest.mark.parametrize(
("trusted_hosts", "response_text"),
[
# always trust
("*", "Remote: https://1.2.3.4:0"),
# all proxies are trusted
(
["127.0.0.1", "10.0.2.1", "192.168.0.2"],
"Remote: https://1.2.3.4:0",
),
# order doesn't matter
(
["10.0.2.1", "192.168.0.2", "127.0.0.1"],
"Remote: https://1.2.3.4:0",
),
# should set first untrusted as remote address
(["192.168.0.2", "127.0.0.1"], "Remote: https://10.0.2.1:0"),
],
)
async def test_proxy_headers_multiple_proxies(trusted_hosts, response_text):
app_with_middleware = ProxyHeadersMiddleware(app, trusted_hosts=trusted_hosts)
async with httpx.AsyncClient(
app=app_with_middleware, base_url="http://testserver"
) as client:
headers = {
"X-Forwarded-Proto": "https",
"X-Forwarded-For": "1.2.3.4, 10.0.2.1, 192.168.0.2",
}
response = await client.get("/", headers=headers)

assert response.status_code == 200
assert response.text == "Remote: https://1.2.3.4:0"
assert response.text == response_text


@pytest.mark.asyncio
async def test_proxy_headers_invalid_x_forwarded_for():
async with httpx.AsyncClient(app=app, base_url="http://testserver") as client:
app_with_middleware = ProxyHeadersMiddleware(app, trusted_hosts="*")
async with httpx.AsyncClient(
app=app_with_middleware, base_url="http://testserver"
) as client:
headers = httpx.Headers(
{
"X-Forwarded-Proto": "https",
"X-Forwarded-For": "\xf0\xfd\xfd\xfd, 1.2.3.4",
"X-Forwarded-For": "1.2.3.4, \xf0\xfd\xfd\xfd",
},
encoding="latin-1",
)
Expand Down
20 changes: 17 additions & 3 deletions uvicorn/middleware/proxy_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,28 @@

https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers#Proxies
"""
from typing import List


class ProxyHeadersMiddleware:
def __init__(self, app, trusted_hosts="127.0.0.1"):
self.app = app
if isinstance(trusted_hosts, str):
self.trusted_hosts = [item.strip() for item in trusted_hosts.split(",")]
self.trusted_hosts = {item.strip() for item in trusted_hosts.split(",")}
else:
self.trusted_hosts = trusted_hosts
self.trusted_hosts = set(trusted_hosts)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For speedup str not in collection operations

self.always_trust = "*" in self.trusted_hosts

def get_trusted_client_host(
self, x_forwarded_for_hosts
): # type: (List[str]) -> str
if self.always_trust:
return x_forwarded_for_hosts[0]

for host in reversed(x_forwarded_for_hosts):
Copy link
Contributor Author

@b0g3r b0g3r Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a security feature: we should return only the last non-trusted host.

Trivial case:

  • Proxy-server A
  • Proxy-server B
  • Client C
  • We trust both of proxies (trusted_hosts = A, B) or trust every proxy (trusted_hosts = *)

So in this scenario, a header will be X-Forwarded-For: C, B, A and we can securely extract C as a trusted client address.

Unfortunately, IRL we have a situation when we have a chain of proxies, and we can trust only a few of them, e.g:

  • Proxy-server A
  • Proxy-server B
  • Client C
  • We trust only A (trusted_hosts = A)

In this case, non-trusted proxy-server B can replace the real client address, so we should return the first non-trusted host and it will be B. Because the chain is actually reverted, we should check it also with reversed.

tl;dr: In a simple case, we can trust all of the proxies with trusted_hosts = * and it will return real client address (the first in X-Forwarded-For)

if host not in self.trusted_hosts:
return host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a fall through case so there's no way we can end up returning None here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fail to see how it would be possible

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would happen if all the IPs in the chain were explicitly listed in trusted_hosts - but this would only occur if trusted_hosts were incorrectly set and/or the first reverse proxy in the chain failed to set x-forwarded-for to the client's IP (not our fault).

In this case, is there any other justifiable return value than None? I can't think of one


async def __call__(self, scope, receive, send):
if scope["type"] in ("http", "websocket"):
client_addr = scope.get("client")
Expand All @@ -38,7 +49,10 @@ async def __call__(self, scope, receive, send):
# X-Forwarded-For header. We've lost the connecting client's port
# information by now, so only include the host.
x_forwarded_for = headers[b"x-forwarded-for"].decode("latin1")
host = x_forwarded_for.split(",")[-1].strip()
x_forwarded_for_hosts = [
item.strip() for item in x_forwarded_for.split(",")
]
host = self.get_trusted_client_host(x_forwarded_for_hosts)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if the x-forwarded-for format is IP1:PORT1,IP2:PORT2,which is common in the L7 load balance? the port here is always set zero which is not reasonable.

port = 0
scope["client"] = (host, port)

Expand Down