Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #5107: Support multiple respositories on same host with different credentials. #5463

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 33 additions & 34 deletions src/poetry/utils/authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,16 @@ def request(self, method: str, url: str, **kwargs: Any) -> requests.Response:
def get_credentials_for_url(self, url: str) -> tuple[str | None, str | None]:
parsed_url = urllib.parse.urlsplit(url)

netloc = parsed_url.netloc

credentials = self._credentials.get(netloc, (None, None))
credentials = self._credentials.get(url, (None, None))

if credentials == (None, None):
if "@" not in netloc:
credentials = self._get_credentials_for_netloc(netloc)
if "@" not in parsed_url.netloc:
credentials = self._get_credentials_for_url(url)
else:
# Split from the right because that's how urllib.parse.urlsplit()
# behaves if more than one @ is present (which can be checked using
# the password attribute of urlsplit()'s return value).
auth, netloc = netloc.rsplit("@", 1)
auth, netloc = parsed_url.netloc.rsplit("@", 1)
# Split from the left because that's how urllib.parse.urlsplit()
# behaves if more than one : is present (which again can be checked
# using the password attribute of the return value)
Expand All @@ -139,7 +137,7 @@ def get_credentials_for_url(self, url: str) -> tuple[str | None, str | None]:
if credentials[0] is not None or credentials[1] is not None:
credentials = (credentials[0] or "", credentials[1] or "")

self._credentials[netloc] = credentials
self._credentials[url] = credentials

return credentials[0], credentials[1]

Expand All @@ -149,30 +147,34 @@ def get_pypi_token(self, name: str) -> str:
def get_http_auth(self, name: str) -> dict[str, str] | None:
return self._get_http_auth(name, None)

def _get_http_auth(self, name: str, netloc: str | None) -> dict[str, str] | None:
def _get_http_auth(self, name: str, url: str | None) -> dict[str, str] | None:
if name == "pypi":
url = "https://upload.pypi.org/legacy/"
repository_url = "https://upload.pypi.org/legacy/"
else:
url = self._config.get(f"repositories.{name}.url")
if not url:
repository_url = self._config.get(f"repositories.{name}.url")
if not repository_url:
return None

parsed_url = urllib.parse.urlsplit(url)
parsed_repository_url = urllib.parse.urlsplit(repository_url)
parsed_package_url = urllib.parse.urlsplit(url)

if netloc is None or netloc == parsed_url.netloc:
if url is None or (
parsed_repository_url.netloc == parsed_package_url.netloc
and parsed_package_url.path.startswith(parsed_repository_url.path)
):
auth = self._password_manager.get_http_auth(name)

if auth is None or auth["password"] is None:
username = auth["username"] if auth else None
auth = self._get_credentials_for_netloc_from_keyring(
url, parsed_url.netloc, username
auth = self._get_credentials_for_url_from_keyring(
repository_url, username
)

return auth

def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | None]:
for repository_name, _ in self._get_repository_netlocs():
auth = self._get_http_auth(repository_name, netloc)
def _get_credentials_for_url(self, url: str) -> tuple[str | None, str | None]:
for repository_name, _ in self._get_repository_urls():
auth = self._get_http_auth(repository_name, url)

if auth is None:
continue
Expand All @@ -182,23 +184,19 @@ def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | No
return None, None

def get_certs_for_url(self, url: str) -> dict[str, Path | None]:
parsed_url = urllib.parse.urlsplit(url)

netloc = parsed_url.netloc

return self._certs.setdefault(
netloc,
self._get_certs_for_netloc_from_config(netloc),
url,
self._get_certs_for_url_from_config(url),
)

def _get_repository_netlocs(self) -> Iterator[tuple[str, str]]:
def _get_repository_urls(self) -> Iterator[tuple[str, str]]:
for repository_name in self._config.get("repositories", []):
url = self._config.get(f"repositories.{repository_name}.url")
parsed_url = urllib.parse.urlsplit(url)
yield repository_name, parsed_url.netloc
yield repository_name, self._config.get(
f"repositories.{repository_name}.url"
)

def _get_credentials_for_netloc_from_keyring(
self, url: str, netloc: str, username: str | None
def _get_credentials_for_url_from_keyring(
self, url: str, username: str | None
) -> dict[str, str] | None:
import keyring

Expand All @@ -209,7 +207,8 @@ def _get_credentials_for_netloc_from_keyring(
"password": cred.password,
}

cred = keyring.get_credential(netloc, username)
parsed_url = urllib.parse.urlsplit(url)
cred = keyring.get_credential(parsed_url.netloc, username)
if cred is not None:
return {
"username": cred.username,
Expand All @@ -224,11 +223,11 @@ def _get_credentials_for_netloc_from_keyring(

return None

def _get_certs_for_netloc_from_config(self, netloc: str) -> dict[str, Path | None]:
def _get_certs_for_url_from_config(self, url: str) -> dict[str, Path | None]:
certs = {"cert": None, "verify": None}

for repository_name, repository_netloc in self._get_repository_netlocs():
if netloc == repository_netloc:
for repository_name, repository_url in self._get_repository_urls():
if url == repository_url:
certs["cert"] = get_client_cert(self._config, repository_name)
certs["verify"] = get_cert(self._config, repository_name)
break
Expand Down
142 changes: 101 additions & 41 deletions tests/utils/test_authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import uuid

from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any

Expand Down Expand Up @@ -68,7 +67,59 @@ def test_authenticator_uses_credentials_from_config_if_not_provided(
)

authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/files/foo-0.1.0.tar.gz")
authenticator.request("get", "https://foo.bar/simple/foo-0.1.0.tar.gz")

request = http.last_request()

assert request.headers["Authorization"] == "Basic YmFyOmJheg=="


def test_authenticator_uses_credentials_from_config_matched_by_url_path(
config: Config, mock_remote: None, http: type[httpretty.httpretty]
):
config.merge(
{
"repositories": {
"foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"},
"foo-beta": {"url": "https://foo.bar/beta/files/simple/"},
},
"http-basic": {
"foo-alpha": {"username": "bar", "password": "alpha"},
"foo-beta": {"username": "baz", "password": "beta"},
},
}
)

authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz")

request = http.last_request()

assert request.headers["Authorization"] == "Basic YmFyOmFscGhh"

# Make request on second repository with the same netloc but different credentials
authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz")

request = http.last_request()

assert request.headers["Authorization"] == "Basic YmF6OmJldGE="


def test_authenticator_uses_credentials_from_config_with_at_sign_in_path(
config: Config, mock_remote: None, http: type[httpretty.httpretty]
):
config.merge(
{
"repositories": {
"foo": {"url": "https://foo.bar/beta/files/simple/"},
},
"http-basic": {
"foo": {"username": "bar", "password": "baz"},
},
}
)
authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/beta/files/simple/f@@-0.1.0.tar.gz")

request = http.last_request()

Expand All @@ -89,7 +140,7 @@ def test_authenticator_uses_username_only_credentials(
)

authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo001@foo.bar/files/foo-0.1.0.tar.gz")
authenticator.request("get", "https://foo001@foo.bar/files/fo@o-0.1.0.tar.gz")

request = http.last_request()

Expand Down Expand Up @@ -128,7 +179,7 @@ def test_authenticator_uses_empty_strings_as_default_password(
)

authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/files/foo-0.1.0.tar.gz")
authenticator.request("get", "https://foo.bar/simple/foo-0.1.0.tar.gz")

request = http.last_request()

Expand All @@ -146,7 +197,7 @@ def test_authenticator_uses_empty_strings_as_default_username(
)

authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/files/foo-0.1.0.tar.gz")
authenticator.request("get", "https://foo.bar/simple/foo-0.1.0.tar.gz")

request = http.last_request()

Expand All @@ -171,14 +222,14 @@ def test_authenticator_falls_back_to_keyring_url(
)

authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/files/foo-0.1.0.tar.gz")
authenticator.request("get", "https://foo.bar/simple/foo-0.1.0.tar.gz")

request = http.last_request()

assert request.headers["Authorization"] == "Basic OmJhcg=="


def test_authenticator_falls_back_to_keyring_netloc(
def test_authenticator_falls_back_to_keyring_url_matched_by_path(
config: Config,
mock_remote: None,
http: type[httpretty.httpretty],
Expand All @@ -187,19 +238,32 @@ def test_authenticator_falls_back_to_keyring_netloc(
):
config.merge(
{
"repositories": {"foo": {"url": "https://foo.bar/simple/"}},
"repositories": {
"foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"},
"foo-beta": {"url": "https://foo.bar/beta/files/simple/"},
}
}
)

dummy_keyring.set_password("foo.bar", None, SimpleCredential(None, "bar"))
dummy_keyring.set_password(
"https://foo.bar/alpha/files/simple/", None, SimpleCredential(None, "bar")
)
dummy_keyring.set_password(
"https://foo.bar/beta/files/simple/", None, SimpleCredential(None, "baz")
)

authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/files/foo-0.1.0.tar.gz")

authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()

assert request.headers["Authorization"] == "Basic OmJhcg=="

authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()

assert request.headers["Authorization"] == "Basic OmJheg=="


@pytest.mark.filterwarnings("ignore::pytest.PytestUnhandledThreadExceptionWarning")
def test_authenticator_request_retries_on_exception(
Expand Down Expand Up @@ -302,51 +366,47 @@ def test_authenticator_uses_env_provided_credentials(
config.merge({"repositories": {"foo": {"url": "https://foo.bar/simple/"}}})

authenticator = Authenticator(config, NullIO())
authenticator.request("get", "https://foo.bar/files/foo-0.1.0.tar.gz")
authenticator.request("get", "https://foo.bar/simple/foo-0.1.0.tar.gz")

request = http.last_request()

assert request.headers["Authorization"] == "Basic YmFyOmJheg=="


@pytest.mark.parametrize(
"cert,client_cert",
[
(None, None),
(None, "path/to/provided/client-cert"),
("/path/to/provided/cert", None),
("/path/to/provided/cert", "path/to/provided/client-cert"),
],
)
def test_authenticator_uses_certs_from_config_if_not_provided(
@pytest.fixture
def environment_repository_credentials_multiple_repositories(
monkeypatch: MonkeyPatch,
) -> None:
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_ALPHA_USERNAME", "bar")
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_ALPHA_PASSWORD", "alpha")
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_BETA_USERNAME", "baz")
monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_BETA_PASSWORD", "beta")


def test_authenticator_uses_env_provided_credentials_matched_by_url_path(
config: Config,
environ: None,
mock_remote: type[httpretty.httpretty],
http: type[httpretty.httpretty],
mocker: MockerFixture,
cert: str | None,
client_cert: str | None,
environment_repository_credentials_multiple_repositories: None,
):
configured_cert = "/path/to/cert"
configured_client_cert = "/path/to/client-cert"
config.merge(
{
"repositories": {"foo": {"url": "https://foo.bar/simple/"}},
"http-basic": {"foo": {"username": "bar", "password": "baz"}},
"certificates": {
"foo": {"cert": configured_cert, "client-cert": configured_client_cert}
},
"repositories": {
"foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"},
"foo-beta": {"url": "https://foo.bar/beta/files/simple/"},
}
}
)

authenticator = Authenticator(config, NullIO())
session_send = mocker.patch.object(authenticator.session, "send")
authenticator.request(
"get",
"https://foo.bar/files/foo-0.1.0.tar.gz",
verify=cert,
cert=client_cert,
)
kwargs = session_send.call_args[1]

assert Path(kwargs["verify"]) == Path(cert or configured_cert)
assert Path(kwargs["cert"]) == Path(client_cert or configured_client_cert)
authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()

assert request.headers["Authorization"] == "Basic YmFyOmFscGhh"

authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz")
request = http.last_request()

assert request.headers["Authorization"] == "Basic YmF6OmJldGE="