Skip to content

Commit

Permalink
Merge pull request #1194 from takluyver/oidc
Browse files Browse the repository at this point in the history
Initial implementation of uploading with trusted publishing authentication
  • Loading branch information
sigmavirus24 authored Dec 11, 2024
2 parents 66b25ae + ec859fb commit 28e60bb
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 34 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ dependencies = [
"rfc3986 >= 1.4.0",
"rich >= 12.0.0",
"packaging",
"id",
]
dynamic = ["version"]

Expand Down
83 changes: 80 additions & 3 deletions twine/auth.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import functools
import getpass
import json
import logging
from typing import TYPE_CHECKING, Callable, Optional, Type, cast
from urllib.parse import urlparse

from id import AmbientCredentialError # type: ignore
from id import detect_credential

# keyring has an indirect dependency on PyCA cryptography, which has no
# pre-built wheels for ppc64le and s390x, see #1158.
if TYPE_CHECKING:
import keyring
from keyring.errors import NoKeyringError
else:
try:
import keyring
from keyring.errors import NoKeyringError
except ModuleNotFoundError: # pragma: no cover
keyring = None
NoKeyringError = None

from twine import exceptions
from twine import utils
Expand All @@ -28,7 +36,11 @@ def __init__(


class Resolver:
def __init__(self, config: utils.RepositoryConfig, input: CredentialInput) -> None:
def __init__(
self,
config: utils.RepositoryConfig,
input: CredentialInput,
) -> None:
self.config = config
self.input = input

Expand Down Expand Up @@ -57,9 +69,65 @@ def password(self) -> Optional[str]:
self.input.password,
self.config,
key="password",
prompt_strategy=self.password_from_keyring_or_prompt,
prompt_strategy=self.password_from_keyring_or_trusted_publishing_or_prompt,
)

def make_trusted_publishing_token(self) -> Optional[str]:
# Trusted publishing (OpenID Connect): get one token from the CI
# system, and exchange that for a PyPI token.
repository_domain = cast(str, urlparse(self.system).netloc)
session = utils.make_requests_session()

# Indices are expected to support `https://{domain}/_/oidc/audience`,
# which tells OIDC exchange clients which audience to use.
audience_url = f"https://{repository_domain}/_/oidc/audience"
resp = session.get(audience_url, timeout=5)
resp.raise_for_status()
audience = cast(str, resp.json()["audience"])

try:
oidc_token = detect_credential(audience)
except AmbientCredentialError as e:
# If we get here, we're on a supported CI platform for trusted
# publishing, and we have not been given any token, so we can error.
raise exceptions.TrustedPublishingFailure(
"Unable to retrieve an OIDC token from the CI platform for "
f"trusted publishing {e}"
)

if oidc_token is None:
logger.debug("This environment is not supported for trusted publishing")
return None # Fall back to prompting for a token (if possible)

logger.debug("Got OIDC token for audience %s", audience)

token_exchange_url = f"https://{repository_domain}/_/oidc/mint-token"

mint_token_resp = session.post(
token_exchange_url,
json={"token": oidc_token},
timeout=5, # S113 wants a timeout
)
try:
mint_token_payload = mint_token_resp.json()
except json.JSONDecodeError:
raise exceptions.TrustedPublishingFailure(
"The token-minting request returned invalid JSON"
)

if not mint_token_resp.ok:
reasons = "\n".join(
f'* `{error["code"]}`: {error["description"]}'
for error in mint_token_payload["errors"]
)
raise exceptions.TrustedPublishingFailure(
"The token request failed; the index server gave the following "
f"reasons:\n\n{reasons}"
)

logger.debug("Minted upload token for trusted publishing")
return cast(str, mint_token_payload["token"])

@property
def system(self) -> Optional[str]:
return self.config["repository"]
Expand Down Expand Up @@ -90,6 +158,8 @@ def get_password_from_keyring(self) -> Optional[str]:
username = cast(str, self.username)
logger.info("Querying keyring for password")
return cast(str, keyring.get_password(system, username))
except NoKeyringError:
logger.info("No keyring backend found")
except Exception as exc:
logger.warning("Error getting password from keyring", exc_info=exc)
return None
Expand All @@ -102,12 +172,19 @@ def username_from_keyring_or_prompt(self) -> str:

return self.prompt("username", input)

def password_from_keyring_or_prompt(self) -> str:
def password_from_keyring_or_trusted_publishing_or_prompt(self) -> str:
password = self.get_password_from_keyring()
if password:
logger.info("password set from keyring")
return password

if self.is_pypi() and self.username == "__token__":
logger.debug(
"Trying to use trusted publishing (no token was explicitly provided)"
)
if (token := self.make_trusted_publishing_token()) is not None:
return token

# Prompt for API token when required.
what = "API token" if self.is_pypi() else "password"

Expand Down
1 change: 1 addition & 0 deletions twine/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def list_dependencies_and_versions() -> List[Tuple[str, str]]:
"requests",
"requests-toolbelt",
"urllib3",
"id",
]
if sys.version_info < (3, 10):
deps.append("importlib-metadata")
Expand Down
6 changes: 6 additions & 0 deletions twine/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ class NonInteractive(TwineException):
pass


class TrustedPublishingFailure(TwineException):
"""Raised if we expected to use trusted publishing but couldn't."""

pass


class InvalidPyPIUploadURL(TwineException):
"""Repository configuration tries to use PyPI with an incorrect URL.
Expand Down
34 changes: 3 additions & 31 deletions twine/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import Any, Dict, List, Optional, Set, Tuple, cast
from typing import Any, Dict, List, Optional, Set, Tuple

import requests
import requests_toolbelt
import rich.progress
import urllib3
from requests import adapters
from requests_toolbelt.utils import user_agent
from rich import print

import twine
from twine import package as package_file
from twine.utils import make_requests_session

KEYWORDS_TO_NOT_FLATTEN = {"gpg_signature", "attestations", "content"}

Expand All @@ -47,7 +44,7 @@ def __init__(
) -> None:
self.url = repository_url

self.session = requests.session()
self.session = make_requests_session()
# requests.Session.auth should be Union[None, Tuple[str, str], ...]
# But username or password could be None
# See TODO for utils.RepositoryConfig
Expand All @@ -57,35 +54,10 @@ def __init__(
logger.info(f"username: {username if username else '<empty>'}")
logger.info(f"password: <{'hidden' if password else 'empty'}>")

self.session.headers["User-Agent"] = self._make_user_agent_string()
for scheme in ("http://", "https://"):
self.session.mount(scheme, self._make_adapter_with_retries())

# Working around https://github.com/python/typing/issues/182
self._releases_json_data: Dict[str, Dict[str, Any]] = {}
self.disable_progress_bar = disable_progress_bar

@staticmethod
def _make_adapter_with_retries() -> adapters.HTTPAdapter:
retry = urllib3.Retry(
allowed_methods=["GET"],
connect=5,
total=10,
status_forcelist=[500, 501, 502, 503],
)

return adapters.HTTPAdapter(max_retries=retry)

@staticmethod
def _make_user_agent_string() -> str:
user_agent_string = (
user_agent.UserAgentBuilder("twine", twine.__version__)
.include_implementation()
.build()
)

return cast(str, user_agent_string)

def close(self) -> None:
self.session.close()

Expand Down
26 changes: 26 additions & 0 deletions twine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@

import requests
import rfc3986
import urllib3
from requests.adapters import HTTPAdapter
from requests_toolbelt.utils import user_agent

import twine
from twine import exceptions

# Shim for input to allow testing.
Expand Down Expand Up @@ -304,6 +308,28 @@ def get_userpass_value(
get_clientcert = functools.partial(get_userpass_value, key="client_cert")


def make_requests_session() -> requests.Session:
"""Prepare a requests Session with retries & twine's user-agent string."""
s = requests.Session()

retry = urllib3.Retry(
allowed_methods=["GET"],
connect=5,
total=10,
status_forcelist=[500, 501, 502, 503],
)

for scheme in ("http://", "https://"):
s.mount(scheme, HTTPAdapter(max_retries=retry))

s.headers["User-Agent"] = (
user_agent.UserAgentBuilder("twine", twine.__version__)
.include_implementation()
.build()
)
return s


class EnvironmentDefault(argparse.Action):
"""Get values from environment variable."""

Expand Down

0 comments on commit 28e60bb

Please sign in to comment.