Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce HashManager to work around FIPS #9101

Merged
merged 2 commits into from
Mar 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/poetry/publishing/hash_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

import hashlib
import io

from contextlib import suppress
from typing import TYPE_CHECKING
from typing import NamedTuple


if TYPE_CHECKING:
from pathlib import Path


class Hexdigest(NamedTuple):
md5: str | None
sha256: str | None
blake2_256: str | None


class HashManager:
def __init__(self) -> None:
self._sha2_hasher = hashlib.sha256()

self._md5_hasher = None
with suppress(ValueError):
# FIPS mode disables MD5
self._md5_hasher = hashlib.md5()

self._blake_hasher = None
with suppress(ValueError, TypeError):
# FIPS mode disables blake2
self._blake_hasher = hashlib.blake2b(digest_size=256 // 8)

def _md5_update(self, content: bytes) -> None:
if self._md5_hasher is not None:
self._md5_hasher.update(content)

def _md5_hexdigest(self) -> str | None:
if self._md5_hasher is not None:
return self._md5_hasher.hexdigest()
return None

def _blake_update(self, content: bytes) -> None:
if self._blake_hasher is not None:
self._blake_hasher.update(content)

def _blake_hexdigest(self) -> str | None:
if self._blake_hasher is not None:
return self._blake_hasher.hexdigest()
return None

def hash(self, file: Path) -> None:
with file.open("rb") as fp:
for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""):
self._md5_update(content)
self._sha2_hasher.update(content)
self._blake_update(content)

def hexdigest(self) -> Hexdigest:
return Hexdigest(
self._md5_hexdigest(),
self._sha2_hasher.hexdigest(),
self._blake_hexdigest(),
)
22 changes: 7 additions & 15 deletions src/poetry/publishing/uploader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from __future__ import annotations

import hashlib
import io

from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any
Expand All @@ -18,6 +15,7 @@
from requests_toolbelt.multipart import MultipartEncoderMonitor

from poetry.__version__ import __version__
from poetry.publishing.hash_manager import HashManager
from poetry.utils.constants import REQUESTS_TIMEOUT
from poetry.utils.patterns import wheel_file_re

Expand Down Expand Up @@ -126,19 +124,13 @@ def post_data(self, file: Path) -> dict[str, Any]:

file_type = self._get_type(file)

blake2_256_hash = hashlib.blake2b(digest_size=256 // 8)
hash_manager = HashManager()
hash_manager.hash(file)
file_hashes = hash_manager.hexdigest()

md5_hash = hashlib.md5()
sha256_hash = hashlib.sha256()
with file.open("rb") as fp:
for content in iter(lambda: fp.read(io.DEFAULT_BUFFER_SIZE), b""):
md5_hash.update(content)
sha256_hash.update(content)
blake2_256_hash.update(content)

md5_digest = md5_hash.hexdigest()
sha2_digest = sha256_hash.hexdigest()
blake2_256_digest = blake2_256_hash.hexdigest()
md5_digest = file_hashes.md5
sha2_digest = file_hashes.sha256
blake2_256_digest = file_hashes.blake2_256

py_version: str | None = None
if file_type == "bdist_wheel":
Expand Down
83 changes: 83 additions & 0 deletions tests/publishing/test_hash_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import Any

import pytest

from poetry.publishing.hash_manager import HashManager


if TYPE_CHECKING:
from pathlib import Path

from pytest_mock import MockerFixture

from tests.types import FixtureDirGetter


@pytest.fixture
def distributions_dir(fixture_dir: FixtureDirGetter) -> Path:
return fixture_dir("distributions")


@pytest.mark.parametrize(
"file, hashes",
(
(
"demo-0.1.0.tar.gz",
(
"d1912c917363a64e127318655f7d1fe7",
"9fa123ad707a5c6c944743bf3e11a0e80d86cb518d3cf25320866ca3ef43e2ad",
"cb638093d63df647e70b03e963bedc31e021cb088695e29101b69f525e3d5fef",
),
),
(
"demo-0.1.2-py2.py3-none-any.whl",
(
"53b4e10d2bfa81a4206221c4b87843d9",
"55dde4e6828081de7a1e429f33180459c333d9da593db62a3d75a8f5e505dde1",
"b35b9aab064e88fffe42309550ebe425907fb42ccb3b1d173b7d6b7509f38eac",
),
),
),
)
def test_file_hashes_returns_proper_hashes_for_file(
file: str, hashes: tuple[str, ...], distributions_dir: Path
) -> None:
manager = HashManager()
manager.hash(distributions_dir / file)
file_hashes = manager.hexdigest()
assert file_hashes == hashes


def test_file_hashes_returns_none_for_md5_with_fips(
mocker: MockerFixture, distributions_dir: Path
) -> None:
# disable md5
def fips_md5(*args: Any, **kwargs: Any) -> Any:
raise ValueError("Disabled by FIPS")

mocker.patch("hashlib.md5", new=fips_md5)

manager = HashManager()
manager.hash(distributions_dir / "demo-0.1.0.tar.gz")
file_hashes = manager.hexdigest()

assert file_hashes.md5 is None


def test_file_hashes_returns_none_for_blake2_with_fips(
mocker: MockerFixture, distributions_dir: Path
) -> None:
# disable md5
def fips_blake2b(*args: Any, **kwargs: Any) -> Any:
raise ValueError("Disabled by FIPS")

mocker.patch("hashlib.blake2b", new=fips_blake2b)

manager = HashManager()
manager.hash(distributions_dir / "demo-0.1.0.tar.gz")
file_hashes = manager.hexdigest()

assert file_hashes.blake2_256 is None
Loading