forked from CCI-MOC/invoicing
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request CCI-MOC#9 from knikolla/feature/scim_v2
SCIM v2 API to Add/Remove Users to Group
- Loading branch information
Showing
7 changed files
with
363 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
import os | ||
|
||
from rest_framework.authentication import SessionAuthentication, BasicAuthentication | ||
from mozilla_django_oidc.contrib.drf import OIDCAuthentication | ||
|
||
if os.getenv('PLUGIN_AUTH_OIDC') == 'True': | ||
AUTHENTICATION_CLASSES = [OIDCAuthentication, SessionAuthentication] | ||
else: | ||
AUTHENTICATION_CLASSES = [SessionAuthentication, BasicAuthentication] |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
from coldfront.core.allocation import signals | ||
from coldfront.core.allocation.models import Allocation, AllocationUser, AllocationUserStatusChoice | ||
from django.contrib.auth.models import User | ||
from rest_framework.views import APIView | ||
from rest_framework.permissions import IsAdminUser | ||
from rest_framework.response import Response | ||
|
||
from coldfront_plugin_api import auth | ||
|
||
|
||
def allocation_to_group_view(allocation: Allocation) -> dict: | ||
return { | ||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], | ||
"id": allocation.pk, | ||
"displayName": f"Members of allocation {allocation.pk} of project {allocation.project.title}", | ||
"members": [ | ||
{ | ||
"value": x.user.username, | ||
"$ref": x.user.username, | ||
"display": x.user.username, | ||
} for x in AllocationUser.objects.filter(allocation=allocation,status__name="Active") | ||
] | ||
} | ||
|
||
|
||
class ListGroups(APIView): | ||
""" | ||
View to list all groups in the system. | ||
* Requires token authentication. | ||
* Only admin users are able to access this view. | ||
""" | ||
authentication_classes = auth.AUTHENTICATION_CLASSES | ||
permission_classes = [IsAdminUser] | ||
|
||
def get(self, request, format=None): | ||
""" | ||
Return a list of all groups. | ||
""" | ||
allocations = Allocation.objects.filter(status__name="Active") | ||
return Response( | ||
[allocation_to_group_view(allocation) for allocation in allocations] | ||
) | ||
|
||
|
||
class GroupDetail(APIView): | ||
""" | ||
View to list all groups in the system. | ||
* Requires token authentication. | ||
* Only admin users are able to access this view. | ||
""" | ||
authentication_classes = auth.AUTHENTICATION_CLASSES | ||
permission_classes = [IsAdminUser] | ||
|
||
def get(self, request, pk, format=None): | ||
allocation = Allocation.objects.get(pk=pk) | ||
return Response(allocation_to_group_view(allocation)) | ||
|
||
def patch(self, request, pk, format=None): | ||
if ( | ||
request.data["schemas"] != ["urn:ietf:params:scim:api:messages:2.0:PatchOp"] | ||
or request.data.get("path", "members") != "members" | ||
): | ||
return Response(status=400) | ||
|
||
allocation = Allocation.objects.get(pk=pk) | ||
for operation in request.data["Operations"]: | ||
value = operation["value"] | ||
if type(value) == dict: | ||
value = [x["value"] for x in operation["value"]["members"]] | ||
elif type(value) == list: | ||
value = [x["value"] for x in operation["value"]] | ||
|
||
if operation["op"] == "add": | ||
for submitted_user in value: | ||
user = User.objects.get(username=submitted_user) | ||
au = self._set_user_status_on_allocation( | ||
allocation, user, "Active" | ||
) | ||
signals.allocation_activate_user.send( | ||
sender=self.__class__, allocation_user_pk=au.pk, | ||
) | ||
elif operation["op"] == "remove": | ||
for submitted_user in value: | ||
user = User.objects.get(username=submitted_user) | ||
au = self._set_user_status_on_allocation( | ||
allocation, user, "Removed" | ||
) | ||
signals.allocation_remove_user.send( | ||
sender=self.__class__, allocation_user_pk=au.pk, | ||
) | ||
else: | ||
# Replace is not implemented yet. | ||
raise NotImplementedError | ||
|
||
return Response(allocation_to_group_view(allocation)) | ||
|
||
@staticmethod | ||
def _set_user_status_on_allocation(allocation, user, status): | ||
au = AllocationUser.objects.filter( | ||
allocation=allocation, | ||
user=user | ||
).first() | ||
if au: | ||
au.status = AllocationUserStatusChoice.objects.get(name=status) | ||
au.save() | ||
else: | ||
au = AllocationUser.objects.create( | ||
allocation=allocation, | ||
user=user, | ||
status=AllocationUserStatusChoice.objects.get(name=status) | ||
) | ||
return au |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
import sys | ||
from os import devnull | ||
import uuid | ||
|
||
from coldfront.core.allocation.models import (Allocation, | ||
AllocationStatusChoice, | ||
AllocationUser, | ||
AllocationUserStatusChoice) | ||
|
||
from django.contrib.auth.models import User | ||
from coldfront.core.project.models import (Project, | ||
ProjectUser, | ||
ProjectUserRoleChoice, | ||
ProjectUserStatusChoice, ProjectStatusChoice) | ||
from coldfront.core.resource.models import (Resource, | ||
ResourceType, | ||
ResourceAttribute, | ||
ResourceAttributeType) | ||
from django.core.management import call_command | ||
from django.test import TestCase | ||
from django.test.utils import override_settings | ||
|
||
from coldfront_plugin_cloud import attributes | ||
|
||
|
||
class TestBase(TestCase): | ||
|
||
def setUp(self) -> None: | ||
# Otherwise output goes to the terminal for every test that is run | ||
backup, sys.stdout = sys.stdout, open(devnull, 'a') | ||
call_command('initial_setup', '-f') | ||
call_command('load_test_data') | ||
call_command('register_cloud_attributes') | ||
sys.stdout = backup | ||
|
||
@staticmethod | ||
def new_user(username=None) -> User: | ||
username = username or f'{uuid.uuid4().hex}@example.com' | ||
User.objects.create(username=username, email=username) | ||
return User.objects.get(username=username) | ||
|
||
def new_project(self, title=None, pi=None) -> Project: | ||
title = title or uuid.uuid4().hex | ||
pi = pi or self.new_user() | ||
status = ProjectStatusChoice.objects.get(name='New') | ||
|
||
Project.objects.create(title=title, pi=pi, status=status) | ||
return Project.objects.get(title=title) | ||
|
||
@staticmethod | ||
def new_project_user(user, project, role='Manager', status='Active'): | ||
pu, _ = ProjectUser.objects.get_or_create( | ||
user=user, | ||
project=project, | ||
role=ProjectUserRoleChoice.objects.get(name=role), | ||
status=ProjectUserStatusChoice.objects.get(name=status) | ||
) | ||
return pu | ||
|
||
@staticmethod | ||
def new_allocation(project, resource, quantity): | ||
allocation, _ = Allocation.objects.get_or_create( | ||
project=project, | ||
justification='a justification for testing data', | ||
quantity=quantity, | ||
status=AllocationStatusChoice.objects.get( | ||
name='Active') | ||
) | ||
allocation.resources.add(resource) | ||
return allocation | ||
|
||
@staticmethod | ||
def new_allocation_user(allocation, user): | ||
au, _ = AllocationUser.objects.get_or_create( | ||
allocation=allocation, | ||
user=user, | ||
status=AllocationUserStatusChoice.objects.get(name='Active') | ||
) | ||
return au |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import json | ||
import os | ||
import unittest | ||
from os import devnull | ||
import sys | ||
|
||
from coldfront_plugin_api import urls | ||
from coldfront_plugin_api.tests.unit import base | ||
|
||
from coldfront.core.resource import models as resource_models | ||
from coldfront.core.allocation import models as allocation_models | ||
from django.core.management import call_command | ||
from rest_framework.test import APIClient | ||
|
||
|
||
class TestAllocation(base.TestBase): | ||
|
||
def setUp(self) -> None: | ||
self.maxDiff = None | ||
super().setUp() | ||
self.resource = resource_models.Resource.objects.all().first() | ||
|
||
@property | ||
def admin_client(self): | ||
client = APIClient() | ||
client.login(username='admin', password='test1234') | ||
return client | ||
|
||
@property | ||
def logged_in_user_client(self): | ||
client = APIClient() | ||
client.login(username='cgray', password='test1234') | ||
return client | ||
|
||
def test_list_groups(self): | ||
user = self.new_user() | ||
project = self.new_project(pi=user) | ||
allocation = self.new_allocation(project, self.resource, 1) | ||
|
||
response = self.admin_client.get("/api/scim/v2/Groups") | ||
desired_in_response = { | ||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], | ||
"id": allocation.id, | ||
"displayName": f"Members of allocation {allocation.id} of project {allocation.project.title}", | ||
"members": [] | ||
} | ||
|
||
self.assertEqual(response.status_code, 200) | ||
self.assertIn(desired_in_response, response.json()) | ||
|
||
def test_get_group(self): | ||
user = self.new_user() | ||
project = self.new_project(pi=user) | ||
allocation = self.new_allocation(project, self.resource, 1) | ||
|
||
response = self.admin_client.get(f"/api/scim/v2/Groups/{allocation.id}") | ||
|
||
desired_response = { | ||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], | ||
"id": allocation.id, | ||
"displayName": f"Members of allocation {allocation.id} of project {allocation.project.title}", | ||
"members": [] | ||
} | ||
self.assertEqual(response.json(), desired_response) | ||
|
||
def test_add_remove_group_members(self): | ||
user = self.new_user() | ||
project = self.new_project(pi=user) | ||
allocation = self.new_allocation(project, self.resource, 1) | ||
|
||
payload = { | ||
"schemas": [ | ||
"urn:ietf:params:scim:api:messages:2.0:PatchOp" | ||
], | ||
"Operations": [ | ||
{ | ||
"op": "add", | ||
"value": { | ||
"members": [ | ||
{ | ||
"value": user.username | ||
} | ||
] | ||
} | ||
} | ||
] | ||
} | ||
response = self.admin_client.patch(f"/api/scim/v2/Groups/{allocation.id}", | ||
data=payload, | ||
format="json") | ||
self.assertEqual(response.status_code, 200) | ||
|
||
desired_response = { | ||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], | ||
"id": allocation.id, | ||
"displayName": f"Members of allocation {allocation.id} of project {allocation.project.title}", | ||
"members": [{ | ||
"value": user.username, | ||
"$ref": user.username, | ||
"display": user.username, | ||
}] | ||
} | ||
response = self.admin_client.get(f"/api/scim/v2/Groups/{allocation.id}") | ||
self.assertEqual(response.json(), desired_response) | ||
|
||
payload = { | ||
"schemas": [ | ||
"urn:ietf:params:scim:api:messages:2.0:PatchOp" | ||
], | ||
"Operations": [ | ||
{ | ||
"op": "remove", | ||
"value": { | ||
"members": [ | ||
{ | ||
"value": user.username | ||
} | ||
] | ||
} | ||
} | ||
] | ||
} | ||
response = self.admin_client.patch(f"/api/scim/v2/Groups/{allocation.id}", | ||
data=payload, | ||
format="json") | ||
desired_response = { | ||
"schemas": ["urn:ietf:params:scim:schemas:core:2.0:Group"], | ||
"id": allocation.id, | ||
"displayName": f"Members of allocation {allocation.id} of project {allocation.project.title}", | ||
"members": [] | ||
} | ||
self.assertEqual(response.json(), desired_response) | ||
|
||
def test_normal_user_fobidden(self): | ||
response = self.logged_in_user_client.get(f"/api/scim/v2/Groups") | ||
self.assertEqual(response.status_code, 403) | ||
|
||
response = self.logged_in_user_client.get(f"/api/scim/v2/Groups/1234") | ||
self.assertEqual(response.status_code, 403) | ||
|
||
response = self.logged_in_user_client.patch( | ||
f"/api/scim/v2/Groups/1234", | ||
data={}, | ||
format="json" | ||
) | ||
self.assertEqual(response.status_code, 403) |
Oops, something went wrong.