Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DRF permission classes for enforcing OAuth2 scopes and filters. #2

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion edx_rest_framework_extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
""" edx Django REST Framework extensions. """

__version__ = '1.2.5' # pragma: no cover
__version__ = '1.3.0' # pragma: no cover
5 changes: 5 additions & 0 deletions edx_rest_framework_extensions/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Application configuration constants and code.
"""

SWITCH_ENFORCE_JWT_SCOPES = 'oauth2.enforce_jwt_scopes'
44 changes: 44 additions & 0 deletions edx_rest_framework_extensions/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
""" Internal decorator functions for scope-enforcing permission classes. """
from functools import wraps

import waffle
from edx_rest_framework_extensions.authentication import JwtAuthentication
from edx_rest_framework_extensions.config import SWITCH_ENFORCE_JWT_SCOPES


def skip_unless_jwt_authenticated(f):
"""
Permission class decorator for ensuring that authentication
was performed using JwtAuthentication before performing the
permission check.
"""
@wraps(f)
def decorated_function(*args, **kwargs):
""" Determine if JwtAuthentication was used to authenticate the request. """
request = args[1]
if isinstance(request.successful_authenticator, JwtAuthentication):
if getattr(request, 'auth', None):
return f(*args, **kwargs)
else:
# Something went wrong with JwtAuthentication and
# the auth attribute did not get populated with the
# JWT on the request object.
return False
# We will skip scope enforcement if JwtAuthentication
# was not used to authenticate the request.
return True
return decorated_function


def skip_unless_jwt_scopes_enforced(f):
"""
Permission class decorator for ensuring that scope enforcement
is enabled before performing the permission check.
"""
@wraps(f)
def decorated_function(*args, **kwargs):
""" Determine if scope enforcement is enabled. """
if waffle.switch_is_active(SWITCH_ENFORCE_JWT_SCOPES):
return f(*args, **kwargs)
return True
return decorated_function
58 changes: 58 additions & 0 deletions edx_rest_framework_extensions/permissions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,67 @@
""" Permission classes. """
from django.core.exceptions import ImproperlyConfigured
from opaque_keys.edx.keys import CourseKey
from rest_framework.exceptions import PermissionDenied
from rest_framework.permissions import BasePermission

from edx_rest_framework_extensions.decorators import (
skip_unless_jwt_authenticated,
skip_unless_jwt_scopes_enforced,
)
from edx_rest_framework_extensions.utils import decode_jwt_filters, decode_jwt_scopes


class IsSuperuser(BasePermission):
""" Allows access only to superusers. """

def has_permission(self, request, view):
return request.user and request.user.is_superuser


class JwtHasScope(BasePermission):
"""
The request is authenticated as a user and the token used has the right scope.
"""
message = 'JWT missing required scopes.'

@skip_unless_jwt_scopes_enforced
@skip_unless_jwt_authenticated
def has_permission(self, request, view):
jwt_scopes = decode_jwt_scopes(request.auth)
required_scopes = set(self.get_scopes(request, view))
if required_scopes.issubset(jwt_scopes):
return True
return False

def get_scopes(self, request, view):
"""
Return the required scopes defined on the view.
"""
try:
return getattr(view, 'required_scopes')
except AttributeError:
raise ImproperlyConfigured(
'TokenHasScope requires the view to define the required_scopes attribute')


class JwtHasContentOrgFilterForRequestedCourse(BasePermission):
"""
The JWT used to authenticate contains the appropriate content provider
filter for the requested course resource.
"""
message = 'JWT missing required content_org filter.'

@skip_unless_jwt_scopes_enforced
@skip_unless_jwt_authenticated
def has_permission(self, request, view):
"""
Ensure that the course_id kwarg provided to the view contains one
of the organizations specified in the content provider filters
in the JWT used to authenticate.
"""
course_key = CourseKey.from_string(view.kwargs.get('course_id'))
jwt_filters = decode_jwt_filters(request.auth)
for provider_type, filter_value in jwt_filters:
if provider_type == 'content_org' and filter_value == course_key.org:
return True
return False
89 changes: 88 additions & 1 deletion edx_rest_framework_extensions/tests/test_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,22 @@

import ddt
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ImproperlyConfigured
from django.test import RequestFactory, TestCase
from mock import Mock, patch
from rest_framework.exceptions import PermissionDenied
from rest_framework.views import APIView

from edx_rest_framework_extensions.permissions import IsSuperuser
from edx_rest_framework_extensions.tests.factories import UserFactory
from edx_rest_framework_extensions.authentication import JwtAuthentication
from edx_rest_framework_extensions.permissions import (
IsSuperuser,
JwtHasContentOrgFilterForRequestedCourse,
JwtHasScope,
)
from edx_rest_framework_extensions.decorators import SWITCH_ENFORCE_JWT_SCOPES
from edx_rest_framework_extensions.tests import factories
from edx_rest_framework_extensions.tests.test_utils import generate_jwt


@ddt.ddt
Expand All @@ -27,3 +39,78 @@ def test_has_permission_with_invalid_users(self, user):
request.user = user
permission = IsSuperuser()
self.assertFalse(permission.has_permission(request, None))


@ddt.ddt
class JwtHasScopeTests(TestCase):
""" Tests for JwtHasScope permission class. """
def setUp(self):
super(JwtHasScopeTests, self).setUp()
self.user = UserFactory()

@ddt.data(
(True, JwtAuthentication(), ('test:read',), ('test:read',), True),
(True, JwtAuthentication(), ('test:write'), ('test:read',), False),
(True, JwtAuthentication(), (), ('test:read',), False),
(True, None, (), ('test:read'), True),
(False, JwtAuthentication(), (), ('test:read'), True),
(False, None, (), ('test:read'), True),
)
@ddt.unpack
@patch('edx_rest_framework_extensions.decorators.waffle.switch_is_active')
def test_has_permission(self, enforce_scopes, authentication_class, jwt_scopes,
required_scopes, expected_result, waffle_mock):
"""
Test that the permission check returns the expected result when scopes are validated.
"""
waffle_mock.return_value = enforce_scopes
request = RequestFactory().get('/')
request.successful_authenticator = authentication_class
request.auth = generate_jwt(self.user, scopes=jwt_scopes)
view = Mock(required_scopes=required_scopes)
self.assertEqual(JwtHasScope().has_permission(request, view), expected_result)

@patch('edx_rest_framework_extensions.decorators.waffle.switch_is_active')
def test_has_permission_missing_required_scopes(self, waffle_mock):
"""
Test that the permission check raises an exception if
required_scopes was not defined on the view.
"""
waffle_mock.return_value = True
request = RequestFactory().get('/')
request.successful_authenticator = JwtAuthentication()
request.auth = generate_jwt(self.user, scopes=['test:read'])
view = APIView()
with self.assertRaises(ImproperlyConfigured):
JwtHasScope().has_permission(request, view)


@ddt.ddt
class JwtHasContentOrgFilterForRequestedCourseTests(TestCase):
""" Tests for JwtHasContentOrgFilterForRequestedCourse permission class. """
def setUp(self):
super(JwtHasContentOrgFilterForRequestedCourseTests, self).setUp()
self.user = UserFactory()

@ddt.data(
(True, JwtAuthentication(), ['content_org:edX'], {'course_id': 'course-v1:edX+DemoX+Demo_Course'}, True),
(True, JwtAuthentication(), ['content_org:TestX'], {'course_id': 'course-v1:edX+DemoX+Demo_Course'}, False),
(True, JwtAuthentication(), ['test:TestX'], {'course_id': 'course-v1:edX+DemoX+Demo_Course'}, False),
(True, JwtAuthentication(), [], {'course_id': 'course-v1:edX+DemoX+Demo_Course'}, False),
(True, None, [], {'course_id': 'course-v1:edX+DemoX+Demo_Course'}, True),
(False, JwtAuthentication(), [], {'course_id': 'course-v1:edX+DemoX+Demo_Course'}, True),
(False, None, [], {'course_id': 'course-v1:edX+DemoX+Demo_Course'}, True),
)
@ddt.unpack
@patch('edx_rest_framework_extensions.decorators.waffle.switch_is_active')
def test_has_permission(self, enforce_scopes, authentication_class, jwt_filters,
view_kwargs, expected_result, waffle_mock):
"""
Test that the permission check returns the expected result when scopes are validated.
"""
waffle_mock.return_value = enforce_scopes
request = RequestFactory().get('/')
request.successful_authenticator = authentication_class
request.auth = generate_jwt(self.user, filters=jwt_filters)
view = Mock(kwargs=view_kwargs)
self.assertEqual(JwtHasContentOrgFilterForRequestedCourse().has_permission(request, view), expected_result)
84 changes: 79 additions & 5 deletions edx_rest_framework_extensions/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
""" Tests for utility functions. """
import copy
from time import time

import ddt
import jwt
import mock
from django.conf import settings
from django.test import TestCase
from django.test import override_settings, TestCase

from edx_rest_framework_extensions.tests.factories import UserFactory
from edx_rest_framework_extensions import utils


def generate_jwt(user, scopes=None, filters=None):
"""
Generate a valid JWT for authenticated requests.
"""
access_token = generate_jwt_payload(user, scopes=scopes, filters=filters)
return generate_jwt_token(access_token)


def generate_jwt_token(payload, signing_key=None):
"""
Generate a valid JWT token for authenticated requests.
Expand All @@ -19,21 +28,45 @@ def generate_jwt_token(payload, signing_key=None):
return jwt.encode(payload, signing_key).decode('utf-8')


def generate_jwt_payload(user):
def generate_jwt_payload(user, scopes=None, filters=None, version='1.0.0'):
"""
Generate a valid JWT payload given a user.
Generate a valid JWT payload given a user and optionally scopes and filters.
"""
jwt_issuer_data = settings.JWT_AUTH['JWT_ISSUERS'][0]
now = int(time())
ttl = 5
return {
payload = {
'iss': jwt_issuer_data['ISSUER'],
'aud': jwt_issuer_data['AUDIENCE'],
'username': user.username,
'email': user.email,
'iat': now,
'exp': now + ttl
'exp': now + ttl,
'version': version,
}
if scopes:
payload['scopes'] = scopes
if filters:
payload['filters'] = filters
return payload


def exclude_from_jwt_auth_setting(key):
"""
Clone the JWT_AUTH setting dict and remove the given key.
"""
jwt_auth = copy.deepcopy(settings.JWT_AUTH)
del jwt_auth[key]
return jwt_auth


def update_jwt_auth_setting(jwt_auth_overrides):
"""
Clone the JWT_AUTH setting dict and update it with the given overrides.
"""
jwt_auth = copy.deepcopy(settings.JWT_AUTH)
jwt_auth.update(jwt_auth_overrides)
return jwt_auth


@ddt.ddt
Expand Down Expand Up @@ -111,3 +144,44 @@ def test_decode_failure_invalid_token(self):

msg = "All combinations of JWT issuers and secret keys failed to validate the token."
patched_log.error.assert_any_call(msg)

@override_settings(JWT_AUTH=exclude_from_jwt_auth_setting('JWT_SUPPORTED_VERSION'))
def test_decode_supported_jwt_version_not_specified(self):
"""
Verifies the JWT is decoded successfully when the JWT_SUPPORTED_VERSION setting is not specified.
"""
token = generate_jwt_token(self.payload)
self.assertDictEqual(utils.jwt_decode_handler(token), self.payload)

@ddt.data('0.5.0', '1.0.0', '1.0.5', '1.5.0', '1.5.5')
def test_decode_supported_jwt_version(self, jwt_version):
"""
Verifies the JWT is decoded successfully when the JWT_SUPPORTED_VERSION setting is not specified.
"""
jwt_payload = generate_jwt_payload(self.user, version=jwt_version)
token = generate_jwt_token(jwt_payload)
self.assertDictEqual(utils.jwt_decode_handler(token), jwt_payload)

@override_settings(JWT_AUTH=update_jwt_auth_setting({'JWT_SUPPORTED_VERSION': '0.5.0'}))
def test_decode_unsupported_jwt_version(self):
"""
Verifies the function logs decode failures, and raises an
InvalidTokenError if the token version is not supported.
"""
with mock.patch('edx_rest_framework_extensions.utils.logger') as patched_log:
with self.assertRaises(jwt.InvalidTokenError):
token = generate_jwt_token(self.payload)
utils.jwt_decode_handler(token)

# Verify that the proper entries were written to the log file
msg = "Token decode failed due to unsupported JWT version number [%s]"
patched_log.info.assert_any_call(msg, '1.0.0')

msg = "Token decode failed for issuer 'test-issuer-1'"
patched_log.info.assert_any_call(msg, exc_info=True)

msg = "Token decode failed for issuer 'test-issuer-2'"
patched_log.info.assert_any_call(msg, exc_info=True)

msg = "All combinations of JWT issuers and secret keys failed to validate the token."
patched_log.error.assert_any_call(msg)
Loading