Skip to content

Commit

Permalink
Feedback from code review
Browse files Browse the repository at this point in the history
  • Loading branch information
steiza committed Nov 9, 2018
1 parent 7fe3604 commit a115391
Show file tree
Hide file tree
Showing 15 changed files with 549 additions and 263 deletions.
3 changes: 0 additions & 3 deletions dev/environment
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,3 @@ TOKEN_PASSWORD_SECRET="an insecure password reset secret key"
TOKEN_EMAIL_SECRET="an insecure email verification secret key"

WAREHOUSE_LEGACY_DOMAIN=pypi.python.org

ACCOUNT_TOKEN_SECRET="an insecure token api secret"
ACCOUNT_TOKEN_PUBLIC_ID="a public token api id"
1 change: 1 addition & 0 deletions tests/common/db/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class Meta:
model = AccountToken

id = factory.Sequence(lambda n: uuid.uuid4())
secret = factory.fuzzy.FuzzyText(length=100)
username = factory.fuzzy.FuzzyText()
description = factory.fuzzy.FuzzyText(length=100)
is_active = True
Expand Down
283 changes: 166 additions & 117 deletions tests/unit/accounts/test_auth_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
import uuid

from pymacaroons import Macaroon
from pyramid import authentication
from pyramid.interfaces import IAuthenticationPolicy
from pyramid import authentication, security
from pyramid.interfaces import IAuthenticationPolicy, IAuthorizationPolicy
from zope.interface.verify import verifyClass

from warehouse.accounts import auth_policy
from warehouse.accounts.interfaces import IUserService
from warehouse.accounts.interfaces import IUserService, IAccountTokenService

from ...common.db.accounts import AccountTokenFactory, UserFactory

Expand Down Expand Up @@ -112,174 +112,223 @@ def test_unauthenticated_userid(self, monkeypatch):

class TestAccountTokenAuthenticationPolicy:
def test_verify(self):
assert isinstance(
auth_policy.AccountTokenAuthenticationPolicy(pretend.stub()),
authentication.CallbackAuthenticationPolicy,
assert verifyClass(
IAuthenticationPolicy, auth_policy.AccountTokenAuthenticationPolicy
)

def test_account_token_routes_allowed(self):
request = pretend.stub(matched_route=pretend.stub(name="not_a_real_route"))
def test_routes_not_allowed(self):
request = pretend.stub(matched_route=pretend.stub(name="not_allowed_route"))

policy = auth_policy.AccountTokenAuthenticationPolicy(pretend.stub())
assert policy.unauthenticated_userid(request) is None
authn_policy = auth_policy.AccountTokenAuthenticationPolicy(
pretend.stub(), ["allowed_route"]
)

assert authn_policy.unauthenticated_userid(request) is None

def test_account_token_required_parameter(self):
def test_require_known(self):
# Ensure we don't accept just any macaroon
request = pretend.stub(
matched_route=pretend.stub(name="forklift.legacy.file_upload"), params={}
find_service=lambda iface, **kw: {
IAccountTokenService: pretend.stub(
get_unverified_macaroon=(lambda: (None, None))
)
}[iface],
matched_route=pretend.stub(name="allowed_route"),
add_response_callback=lambda x: None,
)

policy = auth_policy.AccountTokenAuthenticationPolicy(pretend.stub())
assert policy.unauthenticated_userid(request) is None
authn_policy = auth_policy.AccountTokenAuthenticationPolicy(
pretend.stub(), ["allowed_route"]
)

assert authn_policy.unauthenticated_userid(request) is None

def test_macaroon_verifier(self, db_request):
user = UserFactory.create(username="test_user")

account_token = AccountTokenFactory.create(
secret="some_secret", username=user.username
)

macaroon = Macaroon(
location="pypi.org", identifier="some_id", key="wrong_secret"
)

def test_account_token_malformed(self):
request = pretend.stub(
matched_route=pretend.stub(name="forklift.legacy.file_upload"),
params={"account_token": "DEADBEEF"},
find_service=lambda iface, **kw: {
IAccountTokenService: pretend.stub(
get_unverified_macaroon=(lambda: (macaroon, account_token))
)
}[iface],
matched_route=pretend.stub(name="allowed_route"),
add_response_callback=lambda x: None,
)

policy = auth_policy.AccountTokenAuthenticationPolicy(pretend.stub())
assert policy.unauthenticated_userid(request) is None
authn_policy = auth_policy.AccountTokenAuthenticationPolicy(
pretend.stub(), ["allowed_route"]
)

assert authn_policy.unauthenticated_userid(request) is None

def test_account_token_auth(self, db_request):
# Test basic happy path
user = UserFactory.create(username="test_user")
account_token = AccountTokenFactory.create(
secret="some_secret", username=user.username
)

def test_account_token_bad_settings(self):
# Test bad location
macaroon = Macaroon(
location="notpypi.org", identifier="example_id", key="example_secret"
location="pypi.org", identifier=str(account_token.id), key="some_secret"
)

request = pretend.stub(
matched_route=pretend.stub(name="forklift.legacy.file_upload"),
params={"account_token": macaroon.serialize()},
registry=pretend.stub(
settings={
"account_token.id": "example_id",
"account_token.secret": "example_secret",
}
),
find_service=lambda iface, **kw: {
IUserService: pretend.stub(
find_userid=(lambda x: user.id if x == "test_user" else None)
),
IAccountTokenService: pretend.stub(
get_unverified_macaroon=(lambda: (macaroon, account_token)),
update_last_used=(lambda x: None),
),
}[iface],
matched_route=pretend.stub(name="allowed_route"),
add_response_callback=lambda x: None,
session={},
)

policy = auth_policy.AccountTokenAuthenticationPolicy(pretend.stub())
assert policy.unauthenticated_userid(request) is None
authn_policy = auth_policy.AccountTokenAuthenticationPolicy(
pretend.stub(), ["allowed_route"]
)

# Test bad identifier
macaroon = Macaroon(
location="pypi.org", identifier="bad_id", key="example_secret"
assert authn_policy.unauthenticated_userid(request) == user.id

# Make sure we allow first-party and third-party caveats
macaroon.add_first_party_caveat("first party caveat")

macaroon.add_third_party_caveat(
location="mysite.com", key="anykey", key_id="anykeyid"
)

request.params["account_token"] = macaroon.serialize()
policy = auth_policy.AccountTokenAuthenticationPolicy(pretend.stub())
assert policy.unauthenticated_userid(request) is None
assert authn_policy.unauthenticated_userid(request) == user.id

# Tamper with macaroon
macaroon = Macaroon(
location="pypi.org", identifier="example_id", key="example_secret"
def test_account_token_interface(self):
def _authenticate(a, b):
return a, b

policy = auth_policy.AccountTokenAuthenticationPolicy(_authenticate, ["route"])

assert policy.remember("", "") == []
assert policy.forget("") == []
assert policy._auth_callback(1, 2) == (1, 2)
assert policy._routes_allowed == ["route"]


class TestAccountTokenAuthorizationPolicy:
def test_verify(self):
assert verifyClass(
IAuthorizationPolicy, auth_policy.AccountTokenAuthorizationPolicy
)

serialized = macaroon.serialize()
def test_have_request(self, monkeypatch):
monkeypatch.setattr(auth_policy, "get_current_request", lambda: None)
authz_policy = auth_policy.AccountTokenAuthorizationPolicy(pretend.stub())

request.params["account_token"] = "".join(
(serialized[:-8], "AAAAAAA", serialized[-1:])
assert isinstance(
authz_policy.permits(pretend.stub(), pretend.stub(), pretend.stub()),
security.Denied,
)

def test_macaroon_verifier(self, db_request, monkeypatch):
user = UserFactory.create(username="test_user")

account_token = AccountTokenFactory.create(
secret="some_secret", username=user.username
)
assert policy.unauthenticated_userid(request) is None

def test_account_token_with_no_user(self, db_request):
macaroon = Macaroon(
location="pypi.org", identifier="example_id", key="example_secret"
location="pypi.org", identifier="some_id", key="wrong_secret"
)

monkeypatch.setattr(auth_policy, "get_current_request", lambda: request)
request = pretend.stub(
find_service=lambda iface, **kw: {
IUserService: pretend.stub(find_userid_by_account_token=pretend.stub())
}[iface],
params={"account_token": macaroon.serialize()},
registry=pretend.stub(
settings={
"account_token.id": "example_id",
"account_token.secret": "example_secret",
}
),
matched_route=pretend.stub(name="forklift.legacy.file_upload"),
db=db_request,
session={},
IAccountTokenService: pretend.stub(
get_unverified_macaroon=(lambda: (macaroon, account_token))
)
}[iface]
)

policy = auth_policy.AccountTokenAuthenticationPolicy(pretend.stub())
assert policy.unauthenticated_userid(request) is None
authz_policy = auth_policy.AccountTokenAuthorizationPolicy(pretend.stub())

def test_account_token_auth(self, db_request):
# Test basic happy path
assert isinstance(
authz_policy.permits(pretend.stub(), pretend.stub(), pretend.stub()),
security.Denied,
)

def test_account_token_authz(self, db_request, monkeypatch):
user = UserFactory.create(username="test_user")
account_token = AccountTokenFactory.create(username=user.username)
account_token_id = str(account_token.id)

macaroon = Macaroon(
location="pypi.org", identifier="example_id", key="example_secret"
account_token = AccountTokenFactory.create(
secret="some_secret", username=user.username
)

macaroon.add_first_party_caveat(f"id: {account_token_id}")
macaroon = Macaroon(
location="pypi.org", identifier="some_id", key="some_secret"
)

monkeypatch.setattr(auth_policy, "get_current_request", lambda: request)
request = pretend.stub(
find_service=lambda iface, **kw: {
IUserService: pretend.stub(
find_userid_by_account_token=(
lambda x: user.id if x == account_token_id else None
)
IAccountTokenService: pretend.stub(
get_unverified_macaroon=(lambda: (macaroon, account_token))
)
}[iface],
params={"account_token": macaroon.serialize()},
registry=pretend.stub(
settings={
"account_token.id": "example_id",
"account_token.secret": "example_secret",
}
),
matched_route=pretend.stub(name="forklift.legacy.file_upload"),
db=db_request,
session={},
}[iface]
)

policy = auth_policy.AccountTokenAuthenticationPolicy(pretend.stub())
assert policy.unauthenticated_userid(request) == user.id
authz_policy = auth_policy.AccountTokenAuthorizationPolicy(
pretend.stub(permits=(lambda *args, **kwargs: "allow"))
)

# Test package_list caveats
macaroon.add_first_party_caveat("package_list: pyexample1 pyexample2")
macaroon.add_third_party_caveat(
location="mysite.com", key="anykey", key_id="anykeyid"
assert (
authz_policy.permits(pretend.stub(), pretend.stub(), pretend.stub())
== "allow"
)
request.params["account_token"] = macaroon.serialize()
request.session["account_token_package_list"] = None

assert policy.unauthenticated_userid(request) == user.id
assert request.session["account_token_package_list"] == "pyexample1 pyexample2"
# Make sure we allow first-party and third-party caveats
macaroon.add_first_party_caveat("first party caveat")

# Ensure you can't overwrite previous caveats
takeover_user = UserFactory.create(username="takeover_user")
takeover_account_token = AccountTokenFactory.create(
username=takeover_user.username
macaroon.add_third_party_caveat(
location="mysite.com", key="anykey", key_id="anykeyid"
)
takeover_account_token_id = str(takeover_account_token.id)

macaroon.add_first_party_caveat(f"id: {takeover_account_token_id}")
macaroon.add_first_party_caveat("package_list: additionalpackage")

request.params["account_token"] = macaroon.serialize()
request.session["account_token_package_list"] = None
assert (
authz_policy.permits(pretend.stub(), pretend.stub(), pretend.stub())
== "allow"
)

assert policy.unauthenticated_userid(request) == user.id
assert request.session["account_token_package_list"] == "pyexample1 pyexample2"
def test_missing_macaroon(self, monkeypatch):
monkeypatch.setattr(auth_policy, "get_current_request", lambda: request)

def test_first_party_caveat_validation(self):
policy = auth_policy.AccountTokenAuthenticationPolicy(pretend.stub())
request = pretend.stub(
find_service=lambda iface, **kw: {
IAccountTokenService: pretend.stub(
get_unverified_macaroon=(lambda: (None, None))
)
}[iface]
)

assert policy._validate_first_party_caveat("id")
assert policy._validate_first_party_caveat("package_list")
assert not policy._validate_first_party_caveat("not_valid")
authz_policy = auth_policy.AccountTokenAuthorizationPolicy(
pretend.stub(permits=(lambda *args, **kwargs: "allow"))
)

def test_account_token_interface(self):
def _authenticate(a, b):
return a, b
assert (
authz_policy.permits(pretend.stub(), pretend.stub(), pretend.stub())
== "allow"
)

policy = auth_policy.AccountTokenAuthenticationPolicy(_authenticate)
def test_principals_allowed_by_permission(self):
authz_policy = auth_policy.AccountTokenAuthorizationPolicy(
pretend.stub(principals_allowed_by_permission=(lambda a, b: (a, b)))
)

assert policy.remember("", "") == []
assert policy.forget("") == []
assert policy._auth_callback(1, 2) == (1, 2)
assert authz_policy.principals_allowed_by_permission(1, 2) == (1, 2)
Loading

0 comments on commit a115391

Please sign in to comment.