Skip to content

Commit

Permalink
Reusable callable type for claim checking code
Browse files Browse the repository at this point in the history
  • Loading branch information
th3coop committed Jul 12, 2023
1 parent e71a327 commit 47fcf8a
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 182 deletions.
5 changes: 0 additions & 5 deletions tests/common/db/oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,8 @@ class Meta:
organization_url_name = factory.Faker("pystr", max_chars=12)
project_id = factory.Faker("uuid4")
activestate_project_name = factory.Faker("pystr", max_chars=12)
project_path = f"{organization_url_name}/{activestate_project_name}"
user_id = factory.Faker("uuid4")
branch_id = factory.Faker("uuid4")
project_visibility = factory.Faker("random_element", elements=[True, False])
sub = factory.Faker("pystr", max_chars=12)


class PendingActiveStatePublisherFactory(WarehouseFactory):
Expand All @@ -99,7 +96,5 @@ class Meta:
organization_url_name = factory.Faker("pystr", max_chars=12)
project_id = factory.Faker("uuid4")
activestate_project_name = factory.Faker("pystr", max_chars=12)
project_path = f"{organization_url_name}/{activestate_project_name}"
user_id = factory.Faker("uuid4")
project_visibility = factory.Faker("random_element", elements=[True, False])
added_by = factory.SubFactory(UserFactory)
12 changes: 0 additions & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,18 +351,6 @@ def oidc_service(db_session):
)


# @pytest.fixture
# def activestate_oidc_service(db_session):
# return oidc_services.NullOIDCPublisherService(
# db_session,
# pretend.stub(),
# ACTIVESTATE_OIDC_ISSUER_URL,
# pretend.stub(),
# pretend.stub(),
# pretend.stub(),
# )


@pytest.fixture
def macaroon_service(db_session):
return macaroon_services.DatabaseMacaroonService(db_session)
Expand Down
185 changes: 97 additions & 88 deletions tests/unit/oidc/models/test_activestate.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
PROJECT_ID = "00000000-0000-1000-8000-000000000001"
USER_ID = "00000000-0000-1000-8000-000000000002"
BRANCH_ID = "00000000-0000-1000-8000-000000000003"
# This follows the format of the subject that ActiveState sends us. We don't
# validate the format when verifying the JWT. That should happen when the
# Publisher is configured. We just need to make sure that the subject matches
#
# Technically, the branch should only be present if a branch was provided in the JWT
# claims
SUBJECT = "org:fake_org_id:project:fake_project_id:branch_id:fake_branch_id"


def test_lookup_strategies():
Expand All @@ -40,65 +47,63 @@ def test_lookup_strategies():


def new_signed_claims(
sub: str = "fake:fake:fake:fake",
sub: str = SUBJECT,
organization_id: str = ORG_ID,
org_url_name: str = "fakeorg",
project_id: str = PROJECT_ID,
project_name: str = "fakeproject",
project_path: str = "fakeorg/fakeproject",
user_id: str = "00000000-0000-1000-8000-000000000002",
user_id: str = USER_ID,
project_visibility: str = "public",
branch_id: str | None = None,
) -> SignedClaims:
project_name = "fakeproject"
org_url_name = "fakeorg"
claims: SignedClaims = {
"sub": sub,
"organization_id": organization_id,
"organization_url_name": org_url_name,
"project_id": project_id,
"project_name": project_name,
"project_path": project_path,
"user_id": user_id,
"project_visibility": project_visibility,
}
claims = SignedClaims(
{
"sub": sub,
"organization_id": organization_id,
"organization_url_name": org_url_name,
"project_id": project_id,
"project_name": project_name,
"project_path": project_path,
"user_id": user_id,
"project_visibility": project_visibility,
}
)
if branch_id:
claims["branch_id"] = branch_id
return claims


class TestActiveStatePublisher:
def test_publisher_name(self):
publisher = ActiveStatePublisher(
organization_id=ORG_ID,
project_id=PROJECT_ID,
user_id=USER_ID,
project_visibility="public",
)
publisher = ActiveStatePublisher()

assert publisher.publisher_name == "ActiveState"

def test_publisher_url(self):
org_name = "fakeorg"
project_name = "fakeproject"
publisher = ActiveStatePublisher(
organization_id=ORG_ID,
project_id=PROJECT_ID,
user_id=USER_ID,
project_visibility="public",
organization_url_name=org_name, activestate_project_name=project_name
)

assert publisher.publisher_url == "https://platform.activestate.com/oauth/oidc"
assert (
publisher.publisher_url()
== f"https://platform.activestate.com/{org_name}/{project_name}"
)

def test_stringifies_as_email(self):
def test_stringifies_as_project_url(self):
org_name = "fakeorg"
project_name = "fakeproject"
publisher = ActiveStatePublisher(
organization_id=ORG_ID,
project_id=PROJECT_ID,
user_id=USER_ID,
project_visibility="public",
organization_url_name=org_name, activestate_project_name=project_name
)

assert (
str(publisher)
== f"https://platform.activestate.com/{publisher.project_path}"
== f"https://platform.activestate.com/{org_name}/{project_name}"
)

def test_activestate_publisher_all_known_claims(self):
Expand All @@ -107,7 +112,6 @@ def test_activestate_publisher_all_known_claims(self):
"organization_id",
"project_id",
"user_id",
"project_visibility",
"sub",
# optional verifiable claims
"branch_id",
Expand All @@ -118,18 +122,15 @@ def test_activestate_publisher_all_known_claims(self):
"exp",
"aud",
# unchecked claims
"project_visibility",
"project_name",
"project_path",
"organization_url_name",
}

def test_activestate_publisher_unaccounted_claims(self, monkeypatch):
publisher = ActiveStatePublisher(
sub="fake:fake:fake:fake:fake:fake",
organization_id=ORG_ID,
project_id=PROJECT_ID,
user_id=USER_ID,
project_visibility="public",
sub=SUBJECT,
)

scope = pretend.stub()
Expand All @@ -143,7 +144,6 @@ def test_activestate_publisher_unaccounted_claims(self, monkeypatch):
)
monkeypatch.setattr(_core, "sentry_sdk", sentry_sdk)

# We don't care if these actually verify, only that they're present.
signed_claims = {
claim_name: "fake" for claim_name in ActiveStatePublisher.all_known_claims()
}
Expand All @@ -158,13 +158,27 @@ def test_activestate_publisher_unaccounted_claims(self, monkeypatch):
]
assert scope.fingerprint == ["another-fake-claim", "fake-claim"]

def test_activestate_publisher_missing_claims(self, monkeypatch):
@pytest.mark.parametrize(
("claim_to_drop", "valid"),
[
("organization_id", False),
("project_id", False),
("user_id", False),
("branch_id", True),
("organization_url_name", True),
("project_visibility", True),
("project_name", True),
("project_path", True),
],
)
def test_activestate_publisher_missing_claims(
self, monkeypatch, claim_to_drop: str, valid: bool
):
publisher = ActiveStatePublisher(
sub="fake:fake:fake:fake:fake:fake",
organization_id="fake",
project_id="fake",
user_id="fake",
project_visibility="fake",
sub=SUBJECT,
organization_id=ORG_ID,
project_id=PROJECT_ID,
user_id=USER_ID,
)

scope = pretend.stub()
Expand All @@ -178,21 +192,21 @@ def test_activestate_publisher_missing_claims(self, monkeypatch):
)
monkeypatch.setattr(_core, "sentry_sdk", sentry_sdk)

signed_claims = {
claim_name: "fake" for claim_name in ActiveStatePublisher.all_known_claims()
}
signed_claims["sub"] = "fake:fake:fake:fake:fake:fake"
# Pop the first signed claim, so that it's the first one to fail.
signed_claims.pop("organization_id")
assert "organization_id" not in signed_claims
signed_claims = new_signed_claims()
# I don't set branch_id in the claims by default, so no need to remove it
if claim_to_drop != "branch_id":
# Remove a required claim to test that the code detects it's missing
signed_claims.pop(claim_to_drop)
assert claim_to_drop not in signed_claims
assert publisher.__required_verifiable_claims__
assert not publisher.verify_claims(signed_claims=signed_claims)
assert sentry_sdk.capture_message.calls == [
pretend.call(
"JWT for ActiveStatePublisher is missing claim: organization_id"
)
]
assert scope.fingerprint == ["organization_id"]
assert publisher.verify_claims(signed_claims=signed_claims) is valid
if not valid:
assert sentry_sdk.capture_message.calls == [
pretend.call(
"JWT for ActiveStatePublisher is missing claim: " + claim_to_drop
)
]
assert scope.fingerprint == [claim_to_drop]

@pytest.mark.parametrize(
("expect", "actual", "valid"),
Expand All @@ -205,11 +219,10 @@ def test_activestate_publisher_org_id_verified(
self, expect: str, actual: str, valid: bool
):
publisher = ActiveStatePublisher(
sub="fake:fake:fake:fake",
sub=SUBJECT,
organization_id=actual,
project_id=PROJECT_ID,
user_id=USER_ID,
project_visibility="public",
)

signed_claims = new_signed_claims(organization_id=expect)
Expand All @@ -218,66 +231,68 @@ def test_activestate_publisher_org_id_verified(
@pytest.mark.parametrize(
("expect", "actual", "valid"),
[
(PROJECT_ID, PROJECT_ID, True),
(PROJECT_ID, ORG_ID, False),
(BRANCH_ID, BRANCH_ID, True),
(BRANCH_ID, PROJECT_ID, False),
# If it's configured in the publisher, it must be present in the claim
(BRANCH_ID, None, False),
# If it's not configured in the publisher, we don't care what it is
# in the claim
(None, None, True),
(None, PROJECT_ID, True),
],
)
def test_activestate_publisher_project_id_verified(
def test_activestate_publisher_branch_id_verified(
self, expect: str, actual: str, valid: bool
):
publisher = ActiveStatePublisher(
sub="fake:fake:fake:fake",
sub=SUBJECT,
organization_id=ORG_ID,
project_id=actual,
project_id=PROJECT_ID,
user_id=USER_ID,
project_visibility="public",
branch_id=expect,
)

signed_claims = new_signed_claims(project_id=expect)
signed_claims = new_signed_claims(branch_id=actual)
assert publisher.verify_claims(signed_claims=signed_claims) is valid

@pytest.mark.parametrize(
("expect", "actual", "valid"),
[
(USER_ID, USER_ID, True),
(USER_ID, ORG_ID, False),
(PROJECT_ID, PROJECT_ID, True),
(PROJECT_ID, ORG_ID, False),
],
)
def test_activestate_publisher_user_id_verified(
def test_activestate_publisher_project_id_verified(
self, expect: str, actual: str, valid: bool
):
publisher = ActiveStatePublisher(
sub="fake:fake:fake:fake",
sub=SUBJECT,
organization_id=ORG_ID,
project_id=PROJECT_ID,
user_id=actual,
project_visibility="public",
project_id=actual,
user_id=USER_ID,
)

signed_claims = new_signed_claims(user_id=expect)
signed_claims = new_signed_claims(project_id=expect)
assert publisher.verify_claims(signed_claims=signed_claims) is valid

@pytest.mark.parametrize(
("expect", "actual", "valid"),
[
("public", "public", True),
("private", "private", True),
("public", "private", False),
("private", "public", False),
(USER_ID, USER_ID, True),
(USER_ID, ORG_ID, False),
],
)
def test_activestate_publisher_project_vis_verified(
def test_activestate_publisher_user_id_verified(
self, expect: str, actual: str, valid: bool
):
publisher = ActiveStatePublisher(
sub="fake:fake:fake:fake",
sub=SUBJECT,
organization_id=ORG_ID,
project_id=PROJECT_ID,
user_id=USER_ID,
project_visibility=actual,
user_id=actual,
)

signed_claims = new_signed_claims(project_visibility=expect)
signed_claims = new_signed_claims(user_id=expect)
assert publisher.verify_claims(signed_claims=signed_claims) is valid

@pytest.mark.parametrize(
Expand Down Expand Up @@ -317,7 +332,6 @@ def test_activestate_publisher_project_vis_verified(
)
def test_activestate_publisher_sub(self, expected: str, actual: str, valid: bool):
check = ActiveStatePublisher.__required_verifiable_claims__["sub"]
# claims: SignedClaims = new_signed_claims(sub=actual)
assert check(expected, actual, pretend.stub()) is valid


Expand All @@ -337,8 +351,6 @@ def test_reify_does_not_exist_yet(self, db_request):
)
publisher = pending_publisher.reify(db_request.db)

# If an OIDC publisher for this pending publisher does not already exist,
# a new one is created and the pending publisher is marked for deletion.
assert isinstance(publisher, ActiveStatePublisher)
assert pending_publisher in db_request.db.deleted
assert publisher.organization_id == pending_publisher.organization_id
Expand All @@ -350,12 +362,9 @@ def test_reify_already_exists(self, db_request):
organization_id=existing_publisher.organization_id,
project_id=existing_publisher.project_id,
branch_id=existing_publisher.branch_id,
project_visibility=existing_publisher.project_visibility,
sub=existing_publisher.sub,
)
publisher = pending_publisher.reify(db_request.db)

# If an OIDC publisher for this pending publisher already exists,
# it is returned and the pending publisher is marked for deletion.
assert existing_publisher == publisher
assert pending_publisher in db_request.db.deleted
Loading

0 comments on commit 47fcf8a

Please sign in to comment.