Skip to content

Commit

Permalink
authenticator: allow multiple repos w/ same netloc
Browse files Browse the repository at this point in the history
Co-authored-by: Agni Sairent <agniczech@gmail.com>
  • Loading branch information
abn and Agni Sairent committed May 6, 2022
1 parent 5c1bf62 commit 965dc0d
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 114 deletions.
20 changes: 20 additions & 0 deletions src/poetry/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,20 @@ def _all(config: dict, parent_key: str = "") -> dict:
def raw(self) -> dict[str, Any]:
return self._config

@staticmethod
def _get_environment_repositories() -> dict[str, dict[str, str]]:
repositories = {}
pattern = re.compile(r"POETRY_REPOSITORIES_(?P<name>[A-Z_]+)_URL")

for env_key in os.environ.keys():
match = pattern.match(env_key)
if match:
repositories[match.group("name").lower().replace("_", "-")] = {
"url": os.environ[env_key]
}

return repositories

def get(self, setting_name: str, default: Any = None) -> Any:
"""
Retrieve a setting value.
Expand All @@ -112,6 +126,12 @@ def get(self, setting_name: str, default: Any = None) -> Any:
# Looking in the environment if the setting
# is set via a POETRY_* environment variable
if self._use_environment:
if setting_name == "repositories":
# repositories setting is special for now
repositories = self._get_environment_repositories()
if repositories:
return repositories

env = "POETRY_" + "_".join(k.upper().replace("-", "_") for k in keys)
env_value = os.getenv(env)
if env_value is not None:
Expand Down
6 changes: 3 additions & 3 deletions src/poetry/publishing/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ def publish(
logger.debug(
f"Found authentication information for {repository_name}."
)
username = auth["username"]
password = auth["password"]
username = auth.username
password = auth.password

resolved_client_cert = client_cert or get_client_cert(
self._poetry.config, repository_name
)
# Requesting missing credentials but only if there is not a client cert defined.
if not resolved_client_cert:
if not resolved_client_cert and hasattr(self._io, "ask"):
if username is None:
username = self._io.ask("Username:")

Expand Down
262 changes: 158 additions & 104 deletions src/poetry/utils/authenticator.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from __future__ import annotations

import contextlib
import dataclasses
import functools
import logging
import time
import urllib.parse

from os.path import commonprefix
from typing import TYPE_CHECKING
from typing import Any
from typing import Iterator

import requests
import requests.auth
Expand All @@ -20,6 +22,7 @@
from poetry.locations import REPOSITORY_CACHE_DIR
from poetry.utils.helpers import get_cert
from poetry.utils.helpers import get_client_cert
from poetry.utils.password_manager import HTTPAuthCredential
from poetry.utils.password_manager import PasswordManager


Expand All @@ -34,6 +37,50 @@
logger = logging.getLogger(__name__)


@dataclasses.dataclass
class AuthenticatorRepositoryConfig:
name: str
url: str
netloc: str = dataclasses.field(init=False)
path: str = dataclasses.field(init=False)

def __post_init__(self) -> None:
parsed_url = urllib.parse.urlsplit(self.url)
self.netloc = parsed_url.netloc
self.path = parsed_url.path

def certs(self, config: Config) -> dict[str, Path | None]:
return {
"cert": get_client_cert(config, self.name),
"verify": get_cert(config, self.name),
}

@property
def http_credential_keys(self) -> list[str]:
return [self.url, self.netloc, self.name]

def get_http_credentials(
self, password_manager: PasswordManager, username: str | None = None
) -> HTTPAuthCredential:
# try with the repository name via the password manager
credential = HTTPAuthCredential(
**(password_manager.get_http_auth(self.name) or {})
)

if credential.password is None:
# fallback to url and netloc based keyring entries
credential = password_manager.keyring.get_credential(
self.url, self.netloc, username=credential.username
)

if credential.password is not None:
return HTTPAuthCredential(
username=credential.username, password=credential.password
)

return credential


class Authenticator:
def __init__(
self,
Expand All @@ -46,8 +93,11 @@ def __init__(
self._io = io
self._session: requests.Session | None = None
self._sessions_for_netloc: dict[str, requests.Session] = {}
self._credentials: dict[str, tuple[str, str]] = {}
self._credentials: dict[str, HTTPAuthCredential] = {}
self._certs: dict[str, dict[str, Path | None]] = {}
self._configured_repositories: dict[
str, AuthenticatorRepositoryConfig
] | None = None
self._password_manager = PasswordManager(self._config)
self._cache_control = (
FileCache(
Expand Down Expand Up @@ -101,11 +151,11 @@ def delete_cache(self, url: str) -> None:

def authenticated_url(self, url: str) -> str:
parsed = urllib.parse.urlparse(url)
username, password = self.get_credentials_for_url(url)
credential = self.get_credentials_for_url(url)

if username is not None and password is not None:
username = urllib.parse.quote(username, safe="")
password = urllib.parse.quote(password, safe="")
if credential.username is not None and credential.password is not None:
username = urllib.parse.quote(credential.username, safe="")
password = urllib.parse.quote(credential.password, safe="")

return (
f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}"
Expand All @@ -117,10 +167,12 @@ def request(
self, method: str, url: str, raise_for_status: bool = True, **kwargs: Any
) -> requests.Response:
request = requests.Request(method, url)
username, password = self.get_credentials_for_url(url)
credential = self.get_credentials_for_url(url)

if username is not None and password is not None:
request = requests.auth.HTTPBasicAuth(username, password)(request)
if credential.username is not None or credential.password is not None:
request = requests.auth.HTTPBasicAuth(
credential.username or "", credential.password or ""
)(request)

session = self.get_session(url=url)
prepared_request = session.prepare_request(request)
Expand Down Expand Up @@ -180,18 +232,37 @@ def get(self, url: str, **kwargs: Any) -> requests.Response:
def post(self, url: str, **kwargs: Any) -> requests.Response:
return self.request("post", url, **kwargs)

def get_credentials_for_url(self, url: str) -> tuple[str | None, str | None]:
parsed_url = urllib.parse.urlsplit(url)

netloc = parsed_url.netloc
def _get_credentials_for_url(self, url: str) -> HTTPAuthCredential:
repository = self.get_repository_config_for_url(url)

credentials: tuple[str | None, str | None] = self._credentials.get(
netloc, (None, None)
credential = (
repository.get_http_credentials(password_manager=self._password_manager)
if repository is not None
else HTTPAuthCredential()
)

if credentials == (None, None):
if credential.password is None:
parsed_url = urllib.parse.urlsplit(url)
netloc = parsed_url.netloc
credential = self._password_manager.keyring.get_credential(
url, netloc, username=credential.username
)

return HTTPAuthCredential(
username=credential.username, password=credential.password
)

return credential

def get_credentials_for_url(self, url: str) -> HTTPAuthCredential:
parsed_url = urllib.parse.urlsplit(url)
netloc = parsed_url.netloc

if url not in self._credentials:
if "@" not in netloc:
credentials = self._get_credentials_for_netloc(netloc)
# no credentials were provided in the url, try finding the
# best repository configuration
self._credentials[url] = self._get_credentials_for_url(url)
else:
# Split from the right because that's how urllib.parse.urlsplit()
# behaves if more than one @ is present (which can be checked using
Expand All @@ -201,110 +272,93 @@ def get_credentials_for_url(self, url: str) -> tuple[str | None, str | None]:
# behaves if more than one : is present (which again can be checked
# using the password attribute of the return value)
user, password = auth.split(":", 1) if ":" in auth else (auth, "")
credentials = (
self._credentials[url] = HTTPAuthCredential(
urllib.parse.unquote(user),
urllib.parse.unquote(password),
)

if any(credential is not None for credential in credentials):
credentials = (credentials[0] or "", credentials[1] or "")
self._credentials[netloc] = credentials

return credentials
return self._credentials[url]

def get_pypi_token(self, name: str) -> str | None:
return self._password_manager.get_pypi_token(name)

def get_http_auth(self, name: str) -> dict[str, str | None] | None:
return self._get_http_auth(name, None)

def _get_http_auth(
self, name: str, netloc: str | None
) -> dict[str, str | None] | None:
def get_http_auth(
self, name: str, username: str | None = None
) -> HTTPAuthCredential | None:
if name == "pypi":
url = "https://upload.pypi.org/legacy/"
repository = AuthenticatorRepositoryConfig(
name, "https://upload.pypi.org/legacy/"
)
else:
url = self._config.get(f"repositories.{name}.url")
if not url:
if name not in self.configured_repositories:
return None
repository = self.configured_repositories[name]

parsed_url = urllib.parse.urlsplit(url)

if netloc is None or netloc == parsed_url.netloc:
auth = self._password_manager.get_http_auth(name)
auth = auth or {}

if auth.get("password") is None:
username = auth.get("username")
auth = self._get_credentials_for_netloc_from_keyring(
url, parsed_url.netloc, username
)

return auth

return None

def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | None]:
for repository_name, _ in self._get_repository_netlocs():
auth = self._get_http_auth(repository_name, netloc)

if auth is None:
continue
return self._credentials.setdefault(
repository.url,
repository.get_http_credentials(
password_manager=self._password_manager, username=username
),
)

return auth.get("username"), auth.get("password")
@property
def configured_repositories(self) -> dict[str, AuthenticatorRepositoryConfig]:
if self._configured_repositories is None:
self._configured_repositories = {}
for repository_name in self._config.get("repositories", []):
url = self._config.get(f"repositories.{repository_name}.url")
self._configured_repositories[
repository_name
] = AuthenticatorRepositoryConfig(repository_name, url)

return None, None
return self._configured_repositories

def get_certs_for_url(self, url: str) -> dict[str, Path | None]:
return self._certs.setdefault(
url,
self._get_certs_for_url(url),
)

@functools.lru_cache(maxsize=None)
def get_repository_config_for_url(
self, url: str
) -> AuthenticatorRepositoryConfig | None:
parsed_url = urllib.parse.urlsplit(url)
candidates_netloc_only = []
candidates_path_match = []

for repository in self.configured_repositories.values():

if repository.netloc == parsed_url.netloc:
if parsed_url.path.startswith(repository.path) or commonprefix(
(parsed_url.path, repository.path)
):
candidates_path_match.append(repository)
continue
candidates_netloc_only.append(repository)

if candidates_path_match:
candidates = candidates_path_match
elif candidates_netloc_only:
candidates = candidates_netloc_only
else:
return None

netloc = parsed_url.netloc
if len(candidates) > 1:
logger.debug(
"Multiple source configurations found for %s - %s",
parsed_url.netloc,
", ".join(map(lambda c: c.name, candidates)),
)
# prefer the more specific path
candidates.sort(
key=lambda c: len(commonprefix([parsed_url.path, c.path])), reverse=True
)

return self._certs.setdefault(
netloc,
self._get_certs_for_netloc_from_config(netloc),
)
return candidates[0]

def _get_repository_netlocs(self) -> Iterator[tuple[str, str]]:
for repository_name in self._config.get("repositories", []):
url = self._config.get(f"repositories.{repository_name}.url")
parsed_url = urllib.parse.urlsplit(url)
yield repository_name, parsed_url.netloc

def _get_credentials_for_netloc_from_keyring(
self, url: str, netloc: str, username: str | None
) -> dict[str, str | None] | None:
import keyring

cred = keyring.get_credential(url, username)
if cred is not None:
return {
"username": cred.username,
"password": cred.password,
}

cred = keyring.get_credential(netloc, username)
if cred is not None:
return {
"username": cred.username,
"password": cred.password,
}

if username:
return {
"username": username,
"password": None,
}

return None

def _get_certs_for_netloc_from_config(self, netloc: str) -> dict[str, Path | None]:
certs: dict[str, Path | None] = {"cert": None, "verify": None}

for repository_name, repository_netloc in self._get_repository_netlocs():
if netloc == repository_netloc:
certs["cert"] = get_client_cert(self._config, repository_name)
certs["verify"] = get_cert(self._config, repository_name)
break

return certs
def _get_certs_for_url(self, url: str) -> dict[str, Path | None]:
selected = self.get_repository_config_for_url(url)
if selected:
return selected.certs(config=self._config)
return {"cert": None, "verify": None}
Loading

0 comments on commit 965dc0d

Please sign in to comment.