From 47fcf8a6387cea6212c3fc32d1eb468e1af661e4 Mon Sep 17 00:00:00 2001 From: Carey Hoffman Date: Tue, 30 May 2023 17:02:46 -0700 Subject: [PATCH] Reusable callable type for claim checking code --- tests/common/db/oidc.py | 5 - tests/conftest.py | 12 -- tests/unit/oidc/models/test_activestate.py | 185 +++++++++--------- tests/unit/oidc/test_utils.py | 3 - ...b1b94c6_add_activestate_oidc_publisher.py} | 25 ++- warehouse/oidc/models/_core.py | 10 +- warehouse/oidc/models/activestate.py | 74 ++----- warehouse/oidc/models/google.py | 1 - warehouse/oidc/utils.py | 6 +- 9 files changed, 139 insertions(+), 182 deletions(-) rename warehouse/migrations/versions/{e5f60f8730a0_add_activestate_oidc_publisher.py => e8c7bb1b94c6_add_activestate_oidc_publisher.py} (89%) diff --git a/tests/common/db/oidc.py b/tests/common/db/oidc.py index ab704df1f4af..5d13ce7013b7 100644 --- a/tests/common/db/oidc.py +++ b/tests/common/db/oidc.py @@ -81,11 +81,8 @@ class Meta: organization_url_name = factory.Faker("pystr", max_chars=12) project_id = factory.Faker("uuid4") activestate_project_name = factory.Faker("pystr", max_chars=12) - project_path = f"{organization_url_name}/{activestate_project_name}" user_id = factory.Faker("uuid4") branch_id = factory.Faker("uuid4") - project_visibility = factory.Faker("random_element", elements=[True, False]) - sub = factory.Faker("pystr", max_chars=12) class PendingActiveStatePublisherFactory(WarehouseFactory): @@ -99,7 +96,5 @@ class Meta: organization_url_name = factory.Faker("pystr", max_chars=12) project_id = factory.Faker("uuid4") activestate_project_name = factory.Faker("pystr", max_chars=12) - project_path = f"{organization_url_name}/{activestate_project_name}" user_id = factory.Faker("uuid4") - project_visibility = factory.Faker("random_element", elements=[True, False]) added_by = factory.SubFactory(UserFactory) diff --git a/tests/conftest.py b/tests/conftest.py index 9a270f982b0f..d0e439950834 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -351,18 +351,6 @@ def oidc_service(db_session): ) -# @pytest.fixture -# def activestate_oidc_service(db_session): -# return oidc_services.NullOIDCPublisherService( -# db_session, -# pretend.stub(), -# ACTIVESTATE_OIDC_ISSUER_URL, -# pretend.stub(), -# pretend.stub(), -# pretend.stub(), -# ) - - @pytest.fixture def macaroon_service(db_session): return macaroon_services.DatabaseMacaroonService(db_session) diff --git a/tests/unit/oidc/models/test_activestate.py b/tests/unit/oidc/models/test_activestate.py index 621d857853b8..3e07c351afea 100644 --- a/tests/unit/oidc/models/test_activestate.py +++ b/tests/unit/oidc/models/test_activestate.py @@ -29,6 +29,13 @@ PROJECT_ID = "00000000-0000-1000-8000-000000000001" USER_ID = "00000000-0000-1000-8000-000000000002" BRANCH_ID = "00000000-0000-1000-8000-000000000003" +# This follows the format of the subject that ActiveState sends us. We don't +# validate the format when verifying the JWT. That should happen when the +# Publisher is configured. We just need to make sure that the subject matches +# +# Technically, the branch should only be present if a branch was provided in the JWT +# claims +SUBJECT = "org:fake_org_id:project:fake_project_id:branch_id:fake_branch_id" def test_lookup_strategies(): @@ -40,28 +47,30 @@ def test_lookup_strategies(): def new_signed_claims( - sub: str = "fake:fake:fake:fake", + sub: str = SUBJECT, organization_id: str = ORG_ID, org_url_name: str = "fakeorg", project_id: str = PROJECT_ID, project_name: str = "fakeproject", project_path: str = "fakeorg/fakeproject", - user_id: str = "00000000-0000-1000-8000-000000000002", + user_id: str = USER_ID, project_visibility: str = "public", branch_id: str | None = None, ) -> SignedClaims: project_name = "fakeproject" org_url_name = "fakeorg" - claims: SignedClaims = { - "sub": sub, - "organization_id": organization_id, - "organization_url_name": org_url_name, - "project_id": project_id, - "project_name": project_name, - "project_path": project_path, - "user_id": user_id, - "project_visibility": project_visibility, - } + claims = SignedClaims( + { + "sub": sub, + "organization_id": organization_id, + "organization_url_name": org_url_name, + "project_id": project_id, + "project_name": project_name, + "project_path": project_path, + "user_id": user_id, + "project_visibility": project_visibility, + } + ) if branch_id: claims["branch_id"] = branch_id return claims @@ -69,36 +78,32 @@ def new_signed_claims( class TestActiveStatePublisher: def test_publisher_name(self): - publisher = ActiveStatePublisher( - organization_id=ORG_ID, - project_id=PROJECT_ID, - user_id=USER_ID, - project_visibility="public", - ) + publisher = ActiveStatePublisher() assert publisher.publisher_name == "ActiveState" def test_publisher_url(self): + org_name = "fakeorg" + project_name = "fakeproject" publisher = ActiveStatePublisher( - organization_id=ORG_ID, - project_id=PROJECT_ID, - user_id=USER_ID, - project_visibility="public", + organization_url_name=org_name, activestate_project_name=project_name ) - assert publisher.publisher_url == "https://platform.activestate.com/oauth/oidc" + assert ( + publisher.publisher_url() + == f"https://platform.activestate.com/{org_name}/{project_name}" + ) - def test_stringifies_as_email(self): + def test_stringifies_as_project_url(self): + org_name = "fakeorg" + project_name = "fakeproject" publisher = ActiveStatePublisher( - organization_id=ORG_ID, - project_id=PROJECT_ID, - user_id=USER_ID, - project_visibility="public", + organization_url_name=org_name, activestate_project_name=project_name ) assert ( str(publisher) - == f"https://platform.activestate.com/{publisher.project_path}" + == f"https://platform.activestate.com/{org_name}/{project_name}" ) def test_activestate_publisher_all_known_claims(self): @@ -107,7 +112,6 @@ def test_activestate_publisher_all_known_claims(self): "organization_id", "project_id", "user_id", - "project_visibility", "sub", # optional verifiable claims "branch_id", @@ -118,6 +122,7 @@ def test_activestate_publisher_all_known_claims(self): "exp", "aud", # unchecked claims + "project_visibility", "project_name", "project_path", "organization_url_name", @@ -125,11 +130,7 @@ def test_activestate_publisher_all_known_claims(self): def test_activestate_publisher_unaccounted_claims(self, monkeypatch): publisher = ActiveStatePublisher( - sub="fake:fake:fake:fake:fake:fake", - organization_id=ORG_ID, - project_id=PROJECT_ID, - user_id=USER_ID, - project_visibility="public", + sub=SUBJECT, ) scope = pretend.stub() @@ -143,7 +144,6 @@ def test_activestate_publisher_unaccounted_claims(self, monkeypatch): ) monkeypatch.setattr(_core, "sentry_sdk", sentry_sdk) - # We don't care if these actually verify, only that they're present. signed_claims = { claim_name: "fake" for claim_name in ActiveStatePublisher.all_known_claims() } @@ -158,13 +158,27 @@ def test_activestate_publisher_unaccounted_claims(self, monkeypatch): ] assert scope.fingerprint == ["another-fake-claim", "fake-claim"] - def test_activestate_publisher_missing_claims(self, monkeypatch): + @pytest.mark.parametrize( + ("claim_to_drop", "valid"), + [ + ("organization_id", False), + ("project_id", False), + ("user_id", False), + ("branch_id", True), + ("organization_url_name", True), + ("project_visibility", True), + ("project_name", True), + ("project_path", True), + ], + ) + def test_activestate_publisher_missing_claims( + self, monkeypatch, claim_to_drop: str, valid: bool + ): publisher = ActiveStatePublisher( - sub="fake:fake:fake:fake:fake:fake", - organization_id="fake", - project_id="fake", - user_id="fake", - project_visibility="fake", + sub=SUBJECT, + organization_id=ORG_ID, + project_id=PROJECT_ID, + user_id=USER_ID, ) scope = pretend.stub() @@ -178,21 +192,21 @@ def test_activestate_publisher_missing_claims(self, monkeypatch): ) monkeypatch.setattr(_core, "sentry_sdk", sentry_sdk) - signed_claims = { - claim_name: "fake" for claim_name in ActiveStatePublisher.all_known_claims() - } - signed_claims["sub"] = "fake:fake:fake:fake:fake:fake" - # Pop the first signed claim, so that it's the first one to fail. - signed_claims.pop("organization_id") - assert "organization_id" not in signed_claims + signed_claims = new_signed_claims() + # I don't set branch_id in the claims by default, so no need to remove it + if claim_to_drop != "branch_id": + # Remove a required claim to test that the code detects it's missing + signed_claims.pop(claim_to_drop) + assert claim_to_drop not in signed_claims assert publisher.__required_verifiable_claims__ - assert not publisher.verify_claims(signed_claims=signed_claims) - assert sentry_sdk.capture_message.calls == [ - pretend.call( - "JWT for ActiveStatePublisher is missing claim: organization_id" - ) - ] - assert scope.fingerprint == ["organization_id"] + assert publisher.verify_claims(signed_claims=signed_claims) is valid + if not valid: + assert sentry_sdk.capture_message.calls == [ + pretend.call( + "JWT for ActiveStatePublisher is missing claim: " + claim_to_drop + ) + ] + assert scope.fingerprint == [claim_to_drop] @pytest.mark.parametrize( ("expect", "actual", "valid"), @@ -205,11 +219,10 @@ def test_activestate_publisher_org_id_verified( self, expect: str, actual: str, valid: bool ): publisher = ActiveStatePublisher( - sub="fake:fake:fake:fake", + sub=SUBJECT, organization_id=actual, project_id=PROJECT_ID, user_id=USER_ID, - project_visibility="public", ) signed_claims = new_signed_claims(organization_id=expect) @@ -218,66 +231,68 @@ def test_activestate_publisher_org_id_verified( @pytest.mark.parametrize( ("expect", "actual", "valid"), [ - (PROJECT_ID, PROJECT_ID, True), - (PROJECT_ID, ORG_ID, False), + (BRANCH_ID, BRANCH_ID, True), + (BRANCH_ID, PROJECT_ID, False), + # If it's configured in the publisher, it must be present in the claim + (BRANCH_ID, None, False), + # If it's not configured in the publisher, we don't care what it is + # in the claim + (None, None, True), + (None, PROJECT_ID, True), ], ) - def test_activestate_publisher_project_id_verified( + def test_activestate_publisher_branch_id_verified( self, expect: str, actual: str, valid: bool ): publisher = ActiveStatePublisher( - sub="fake:fake:fake:fake", + sub=SUBJECT, organization_id=ORG_ID, - project_id=actual, + project_id=PROJECT_ID, user_id=USER_ID, - project_visibility="public", + branch_id=expect, ) - signed_claims = new_signed_claims(project_id=expect) + signed_claims = new_signed_claims(branch_id=actual) assert publisher.verify_claims(signed_claims=signed_claims) is valid @pytest.mark.parametrize( ("expect", "actual", "valid"), [ - (USER_ID, USER_ID, True), - (USER_ID, ORG_ID, False), + (PROJECT_ID, PROJECT_ID, True), + (PROJECT_ID, ORG_ID, False), ], ) - def test_activestate_publisher_user_id_verified( + def test_activestate_publisher_project_id_verified( self, expect: str, actual: str, valid: bool ): publisher = ActiveStatePublisher( - sub="fake:fake:fake:fake", + sub=SUBJECT, organization_id=ORG_ID, - project_id=PROJECT_ID, - user_id=actual, - project_visibility="public", + project_id=actual, + user_id=USER_ID, ) - signed_claims = new_signed_claims(user_id=expect) + signed_claims = new_signed_claims(project_id=expect) assert publisher.verify_claims(signed_claims=signed_claims) is valid @pytest.mark.parametrize( ("expect", "actual", "valid"), [ - ("public", "public", True), - ("private", "private", True), - ("public", "private", False), - ("private", "public", False), + (USER_ID, USER_ID, True), + (USER_ID, ORG_ID, False), ], ) - def test_activestate_publisher_project_vis_verified( + def test_activestate_publisher_user_id_verified( self, expect: str, actual: str, valid: bool ): publisher = ActiveStatePublisher( - sub="fake:fake:fake:fake", + sub=SUBJECT, organization_id=ORG_ID, project_id=PROJECT_ID, - user_id=USER_ID, - project_visibility=actual, + user_id=actual, ) - signed_claims = new_signed_claims(project_visibility=expect) + signed_claims = new_signed_claims(user_id=expect) assert publisher.verify_claims(signed_claims=signed_claims) is valid @pytest.mark.parametrize( @@ -317,7 +332,6 @@ def test_activestate_publisher_project_vis_verified( ) def test_activestate_publisher_sub(self, expected: str, actual: str, valid: bool): check = ActiveStatePublisher.__required_verifiable_claims__["sub"] - # claims: SignedClaims = new_signed_claims(sub=actual) assert check(expected, actual, pretend.stub()) is valid @@ -337,8 +351,6 @@ def test_reify_does_not_exist_yet(self, db_request): ) publisher = pending_publisher.reify(db_request.db) - # If an OIDC publisher for this pending publisher does not already exist, - # a new one is created and the pending publisher is marked for deletion. assert isinstance(publisher, ActiveStatePublisher) assert pending_publisher in db_request.db.deleted assert publisher.organization_id == pending_publisher.organization_id @@ -350,12 +362,9 @@ def test_reify_already_exists(self, db_request): organization_id=existing_publisher.organization_id, project_id=existing_publisher.project_id, branch_id=existing_publisher.branch_id, - project_visibility=existing_publisher.project_visibility, sub=existing_publisher.sub, ) publisher = pending_publisher.reify(db_request.db) - # If an OIDC publisher for this pending publisher already exists, - # it is returned and the pending publisher is marked for deletion. assert existing_publisher == publisher assert pending_publisher in db_request.db.deleted diff --git a/tests/unit/oidc/test_utils.py b/tests/unit/oidc/test_utils.py index 5bbb7e7e5f7c..4437bb3bb3ab 100644 --- a/tests/unit/oidc/test_utils.py +++ b/tests/unit/oidc/test_utils.py @@ -157,7 +157,6 @@ def test_find_publisher_by_issuer_activestate( organization_id="00000000-1000-8000-0000-000000000001", project_id="00000000-1000-8000-0000-000000000002", user_id="00000000-1000-8000-0000-000000000003", - project_visibility="public", sub="org:00000000-1000-8000-0000-000000000001:project:00000000-1000-8000-0000-000000000002", # noqa ) ActiveStatePublisherFactory( @@ -165,7 +164,6 @@ def test_find_publisher_by_issuer_activestate( organization_id="00000000-1000-8000-0000-000000000004", project_id="00000000-1000-8000-0000-000000000005", user_id="00000000-1000-8000-0000-000000000006", - project_visibility="public", sub="org:00000000-1000-8000-0000-000000000004:project:00000000-1000-8000-0000-000000000005", # noqa ) ActiveStatePublisherFactory( @@ -173,7 +171,6 @@ def test_find_publisher_by_issuer_activestate( organization_id="00000000-1000-8000-0000-000000000007", project_id="00000000-1000-8000-0000-000000000008", user_id="00000000-1000-8000-0000-000000000009", - project_visibility="public", branch_id="00000000-1000-8000-0000-000000000010", sub="org:00000000-1000-8000-0000-000000000007:project:00000000-1000-8000-0000-000000000008:branch_id:00000000-1000-8000-0000-000000000010", # noqa ) diff --git a/warehouse/migrations/versions/e5f60f8730a0_add_activestate_oidc_publisher.py b/warehouse/migrations/versions/e8c7bb1b94c6_add_activestate_oidc_publisher.py similarity index 89% rename from warehouse/migrations/versions/e5f60f8730a0_add_activestate_oidc_publisher.py rename to warehouse/migrations/versions/e8c7bb1b94c6_add_activestate_oidc_publisher.py index 883a3fce6ee1..7c7abf48f2bf 100644 --- a/warehouse/migrations/versions/e5f60f8730a0_add_activestate_oidc_publisher.py +++ b/warehouse/migrations/versions/e8c7bb1b94c6_add_activestate_oidc_publisher.py @@ -12,16 +12,16 @@ """ add ActiveState OIDC publisher -Revision ID: e5f60f8730a0 +Revision ID: e8c7bb1b94c6 Revises: 4a0276f260c7 -Create Date: 2023-06-20 00:33:58.229211 +Create Date: 2023-06-22 22:57:53.695035 """ import sqlalchemy as sa from alembic import op -revision = "e5f60f8730a0" +revision = "e8c7bb1b94c6" down_revision = "4a0276f260c7" # Note: It is VERY important to ensure that a migration does not lock for a @@ -57,10 +57,8 @@ def upgrade(): sa.Column("organization_url_name", sa.VARCHAR(), nullable=False), sa.Column("project_id", sa.VARCHAR(), nullable=False), sa.Column("activestate_project_name", sa.VARCHAR(), nullable=False), - sa.Column("project_path", sa.VARCHAR(), nullable=False), sa.Column("user_id", sa.VARCHAR(), nullable=False), sa.Column("branch_id", sa.VARCHAR(), nullable=True), - sa.Column("project_visibility", sa.VARCHAR(), nullable=True), sa.ForeignKeyConstraint( ["id"], ["oidc_publishers.id"], @@ -70,7 +68,6 @@ def upgrade(): "organization_id", "organization_url_name", "project_id", - "project_path", "user_id", "branch_id", name="_activestate_oidc_publisher_uc", @@ -84,10 +81,8 @@ def upgrade(): sa.Column("organization_url_name", sa.VARCHAR(), nullable=False), sa.Column("project_id", sa.VARCHAR(), nullable=False), sa.Column("activestate_project_name", sa.VARCHAR(), nullable=False), - sa.Column("project_path", sa.VARCHAR(), nullable=False), sa.Column("user_id", sa.VARCHAR(), nullable=False), sa.Column("branch_id", sa.VARCHAR(), nullable=True), - sa.Column("project_visibility", sa.VARCHAR(), nullable=True), sa.ForeignKeyConstraint( ["id"], ["pending_oidc_publishers.id"], @@ -97,7 +92,6 @@ def upgrade(): "organization_id", "organization_url_name", "project_id", - "project_path", "user_id", "branch_id", name="_pending_activestate_oidc_publisher_uc", @@ -105,6 +99,18 @@ def upgrade(): ) op.drop_index("project_name_ultranormalized", table_name="projects") # ### end Alembic commands ### + # Disable the ActiveState OIDC provider by default + op.execute( + """ + INSERT INTO admin_flags(id, description, enabled, notify) + VALUES ( + 'disallow-activestate-oidc', + 'Disallow the ActiveState OIDC provider', + TRUE, + FALSE + ) + """ + ) def downgrade(): @@ -118,3 +124,4 @@ def downgrade(): op.drop_table("pending_activestate_oidc_publishers") op.drop_table("activestate_oidc_publishers") # ### end Alembic commands ### + op.execute("DELETE FROM admin_flags WHERE id = 'disallow-activestate-oidc'") diff --git a/warehouse/oidc/models/_core.py b/warehouse/oidc/models/_core.py index 7f0fefcb274c..3e90093e4300 100644 --- a/warehouse/oidc/models/_core.py +++ b/warehouse/oidc/models/_core.py @@ -41,10 +41,6 @@ def check_claim_binary(binary_func: Callable[[C, C], bool]) -> CheckClaimCallabl """ def wrapper(ground_truth: C, signed_claim: C, all_signed_claims: SignedClaims): - print("check_claim_binary") - print(f" ground_truth: {ground_truth}") - print(f" signed_claim: {signed_claim}") - return binary_func(ground_truth, signed_claim) return wrapper @@ -181,7 +177,6 @@ def verify_claims(self, signed_claims: SignedClaims): ) # Finally, perform the actual claim verification. - print(f"signed_claims: {signed_claims}") for claim_name, check in self.__required_verifiable_claims__.items(): # All required claims are mandatory. The absence of a missing # claim *is* an error with the JWT, since it indicates a breaking @@ -195,11 +190,8 @@ def verify_claims(self, signed_claims: SignedClaims): f"{claim_name}" ) return False - print(f"verify_claims claime_name {claim_name}") - print(f" Ground truth: {getattr(self, claim_name)}") - print(f" Signed claim: {signed_claim}") + if not check(getattr(self, claim_name), signed_claim, signed_claims): - print(f" Check failed{claim_name}") return False # Check optional verifiable claims diff --git a/warehouse/oidc/models/activestate.py b/warehouse/oidc/models/activestate.py index 59103638fca6..521839a4e29d 100644 --- a/warehouse/oidc/models/activestate.py +++ b/warehouse/oidc/models/activestate.py @@ -27,44 +27,16 @@ ACTIVESTATE_URL = "https://platform.activestate.com" -ACTIVESTATE_OIDC_PATH = "oauth/oidc" - - -# TODO This function is overkill for verifying a claim -# the user should have configured a subject to expect in the claim What is -# currently happening in this function is validation steps that should be in -# the view when the user is creating the publisher -def _check_sub(ground_truth: str, signed_claim: str, _all_signed_claims: SignedClaims): - # Subject format: org:org_id:project:project_id[:branch:branch_id] - # Defensive: ActiveState should never give us an empty subject. - if not signed_claim: - return False - print("ground_truth: ", ground_truth) - print("signed_claim: ", signed_claim) - print("_all_signed_claims: ", _all_signed_claims) - - claim_components = signed_claim.split(":") - if len(claim_components) < 4 or len(ground_truth.split(":")) != len( - claim_components - ): - return False - - branch_id: str | None = _all_signed_claims.get("branch_id") - if _all_signed_claims.get("branch_id") and len(claim_components) == 4: - # if branch_id is provided, it must be in the subject - return False - elif len(claim_components) == 6: - return branch_id == claim_components[5] - - return signed_claim == ground_truth - - -def _check_branch( +ACTIVESTATE_OIDC_API_PATH = "api/v1/oauth/oidc" + + +# branch_id is optional, so we need to verify but it's ok if it's missing +def _check_branch_id_optional( ground_truth: str, signed_claim: str, _all_signed_claims: SignedClaims ): - if not signed_claim: - return True - return ground_truth == signed_claim + if ground_truth: + return ground_truth == signed_claim + return True class ActiveStatePublisherMixin: @@ -77,32 +49,33 @@ class ActiveStatePublisherMixin: organization_url_name = Column(VARCHAR, nullable=False) project_id = Column(VARCHAR, nullable=False) activestate_project_name = Column(VARCHAR, nullable=False) - project_path = Column(VARCHAR, nullable=False) user_id = Column(VARCHAR, nullable=False) branch_id = Column(VARCHAR) - project_visibility = Column(VARCHAR) __required_verifiable_claims__: dict[str, CheckClaimCallable[Any]] = { "sub": check_claim_binary(str.__eq__), "organization_id": check_claim_binary(str.__eq__), "project_id": check_claim_binary(str.__eq__), "user_id": check_claim_binary(str.__eq__), - "project_visibility": check_claim_binary(str.__eq__), } __optional_verifiable_claims__: dict[str, CheckClaimCallable[Any]] = { - "branch_id": _check_branch, + "branch_id": _check_branch_id_optional, } - __unchecked_claims__ = {"project_path", "project_name", "organization_url_name"} + __unchecked_claims__ = { + "organization_url_name", + "project_visibility", + "project_name", + "project_path", + } @staticmethod - def __lookup_all__(klass, signed_claims: SignedClaims) -> Query[OIDCPublisher]: + def __lookup_all__(klass, signed_claims: SignedClaims): return Query(klass).filter_by( organization_id=signed_claims["organization_id"], project_id=signed_claims["project_id"], user_id=signed_claims["user_id"], - project_visibility=signed_claims["project_visibility"], sub=signed_claims["sub"], ) @@ -114,12 +87,11 @@ def __lookup_all__(klass, signed_claims: SignedClaims) -> Query[OIDCPublisher]: def publisher_name(self) -> str: return "ActiveState" - @property def publisher_url(self, claims: SignedClaims | None = None) -> str: - return f"{ACTIVESTATE_URL}/{ACTIVESTATE_OIDC_PATH}" + return f"{ACTIVESTATE_URL}/{self.organization_url_name}/{self.activestate_project_name}" # noqa def __str__(self) -> str: - return f"{ACTIVESTATE_URL}/{self.project_path}" + return f"{ACTIVESTATE_URL}/{self.organization_url_name}/{self.activestate_project_name}" # noqa class ActiveStatePublisher(ActiveStatePublisherMixin, OIDCPublisher): @@ -130,7 +102,6 @@ class ActiveStatePublisher(ActiveStatePublisherMixin, OIDCPublisher): "organization_id", "organization_url_name", "project_id", - "project_path", "user_id", "branch_id", name="_activestate_oidc_publisher_uc", @@ -148,7 +119,6 @@ class PendingActiveStatePublisher(ActiveStatePublisherMixin, PendingOIDCPublishe "organization_id", "organization_url_name", "project_id", - "project_path", "user_id", "branch_id", name="_pending_activestate_oidc_publisher_uc", @@ -159,19 +129,18 @@ class PendingActiveStatePublisher(ActiveStatePublisherMixin, PendingOIDCPublishe UUID(as_uuid=True), ForeignKey(PendingOIDCPublisher.id), primary_key=True ) - def reify(self, session) -> ActiveStatePublisher: + def reify(self, session): """ Returns a `ActiveStatePublisher` for this `PendingActiveStatePublisher`, deleting the `PendingActiveStatePublisher` in the process. """ - # Check if the publisher already exists - maybe_publisher: ActiveStatePublisher | None = ( + # Check if the publisher already exists. Return it if it does. + maybe_publisher = ( session.query(ActiveStatePublisher) .filter( ActiveStatePublisher.organization_id == self.organization_id, ActiveStatePublisher.project_id == self.project_id, ActiveStatePublisher.branch_id == self.branch_id, - ActiveStatePublisher.project_visibility == self.project_visibility, ) .one_or_none() ) @@ -181,7 +150,6 @@ def reify(self, session) -> ActiveStatePublisher: organization_id=self.organization_id, project_id=self.project_id, branch_id=self.branch_id, - project_visibility=self.project_visibility, ) session.delete(self) diff --git a/warehouse/oidc/models/google.py b/warehouse/oidc/models/google.py index d7c86dc3b12c..c924a5b30d84 100644 --- a/warehouse/oidc/models/google.py +++ b/warehouse/oidc/models/google.py @@ -9,7 +9,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from typing import Any from sqlalchemy import Column, ForeignKey, String, UniqueConstraint diff --git a/warehouse/oidc/utils.py b/warehouse/oidc/utils.py index b233a3bc24a3..d07b2b96d1a3 100644 --- a/warehouse/oidc/utils.py +++ b/warehouse/oidc/utils.py @@ -25,8 +25,8 @@ PendingActiveStatePublisher, PendingGitHubPublisher, PendingGooglePublisher, + PendingOIDCPublisher, ) -from warehouse.oidc.models._core import OIDCPublisherMixin GITHUB_OIDC_ISSUER_URL = "https://token.actions.githubusercontent.com" GOOGLE_OIDC_ISSUER_URL = "https://accounts.google.com" @@ -38,7 +38,9 @@ ACTIVESTATE_OIDC_ISSUER_URL, } -OIDC_PUBLISHER_CLASSES: dict[str, dict[bool, type[OIDCPublisherMixin]]] = { +OIDC_PUBLISHER_CLASSES: dict[ + str, dict[bool, type[OIDCPublisher | PendingOIDCPublisher]] +] = { GITHUB_OIDC_ISSUER_URL: {False: GitHubPublisher, True: PendingGitHubPublisher}, GOOGLE_OIDC_ISSUER_URL: {False: GooglePublisher, True: PendingGooglePublisher}, ACTIVESTATE_OIDC_ISSUER_URL: {