diff --git a/api/api_keys/user.py b/api/api_keys/user.py index 409064cf4ae4..6af597360d17 100644 --- a/api/api_keys/user.py +++ b/api/api_keys/user.py @@ -64,6 +64,11 @@ def is_project_admin(self, project: "Project") -> bool: def is_environment_admin(self, environment: "Environment") -> bool: return is_master_api_key_environment_admin(self.key, environment) + def is_group_admin(self, group_id: int) -> bool: + # This is added to match the interface of the FFAdminUser model, + # but a MasterAPIKey cannot be a group admin. + return False + def has_project_permission( self, permission: str, project: "Project", tag_ids: typing.List[int] = None ) -> bool: diff --git a/api/tests/unit/users/test_unit_users_views.py b/api/tests/unit/users/test_unit_users_views.py index 23e96272a320..794a1b41bdcc 100644 --- a/api/tests/unit/users/test_unit_users_views.py +++ b/api/tests/unit/users/test_unit_users_views.py @@ -142,8 +142,7 @@ def test_can_join_organisation_as_admin_if_invite_role_is_admin( def test_admin_can_update_role_for_a_user_in_organisation( - admin_user: FFAdminUser, - admin_client: APIClient, + admin_client_new: APIClient, organisation: Organisation, ): # Given @@ -156,7 +155,7 @@ def test_admin_can_update_role_for_a_user_in_organisation( data = {"role": OrganisationRole.ADMIN.name} # When - response = admin_client.post(url, data=data) + response = admin_client_new.post(url, data=data) # Then assert response.status_code == status.HTTP_200_OK @@ -170,7 +169,7 @@ def test_admin_can_update_role_for_a_user_in_organisation( def test_admin_can_get_users_in_organisation( admin_user: FFAdminUser, - admin_client: APIClient, + admin_client_new: APIClient, staff_user: FFAdminUser, organisation: Organisation, django_assert_num_queries: DjangoAssertNumQueries, @@ -190,7 +189,7 @@ def test_admin_can_get_users_in_organisation( # When with django_assert_num_queries(5): - response = admin_client.get(url) + response = admin_client_new.get(url) # Then assert response.status_code == status.HTTP_200_OK @@ -250,7 +249,7 @@ def test_org_user_can_exclude_themself_when_getting_users_in_organisation( def test_organisation_admin_can_interact_with_groups( organisation: Organisation, - admin_client: APIClient, + admin_client_new: APIClient, ) -> None: # Given # Create a group @@ -260,13 +259,13 @@ def test_organisation_admin_can_interact_with_groups( ) # When / Then - response = admin_client.post(url, data=create_data) + response = admin_client_new.post(url, data=create_data) assert response.status_code == status.HTTP_201_CREATED assert UserPermissionGroup.objects.filter(name=create_data["name"]).exists() group_id = response.json()["id"] # Group appears in the groups list - response = admin_client.get(url) + response = admin_client_new.get(url) assert response.status_code == status.HTTP_200_OK assert response.json()["results"][0]["name"] == "Test Group" @@ -277,16 +276,16 @@ def test_organisation_admin_can_interact_with_groups( args=[organisation.id, group_id], ) - response = admin_client.patch(url, data=update_data) + response = admin_client_new.patch(url, data=update_data) assert response.status_code == status.HTTP_200_OK # Update is reflected when getting the group - response = admin_client.get(url) + response = admin_client_new.get(url) assert response.status_code == status.HTTP_200_OK assert response.data["name"] == update_data["name"] # Delete the group - response = admin_client.delete(url) + response = admin_client_new.delete(url) assert response.status_code == status.HTTP_204_NO_CONTENT assert not UserPermissionGroup.objects.filter(name=update_data["name"]).exists() @@ -362,7 +361,33 @@ def test_can_add_multiple_users_including_current_user( staff_user: FFAdminUser, organisation: Organisation, admin_user: FFAdminUser, - admin_client: APIClient, + admin_client_new: APIClient, +) -> None: + # Given + group = UserPermissionGroup.objects.create( + name="Test Group", organisation=organisation + ) + url = reverse( + "api-v1:organisations:organisation-groups-add-users", + args=[organisation.id, group.id], + ) + data = {"user_ids": [admin_user.id, staff_user.id]} + + # When + response = admin_client_new.post( + url, data=json.dumps(data), content_type="application/json" + ) + + # Then + assert response.status_code == status.HTTP_200_OK + assert all(user in group.users.all() for user in [admin_user, staff_user]) + + +def test_can_add_users_with_master_api_key( + staff_user: FFAdminUser, + organisation: Organisation, + admin_user: FFAdminUser, + admin_master_api_key_client: APIClient, ) -> None: # Given group = UserPermissionGroup.objects.create( @@ -375,7 +400,7 @@ def test_can_add_multiple_users_including_current_user( data = {"user_ids": [admin_user.id, staff_user.id]} # When - response = admin_client.post( + response = admin_master_api_key_client.post( url, data=json.dumps(data), content_type="application/json" ) @@ -385,7 +410,7 @@ def test_can_add_multiple_users_including_current_user( def test_cannot_add_user_from_another_organisation( - admin_client: APIClient, + admin_client_new: APIClient, organisation: Organisation, ): # Given @@ -402,7 +427,7 @@ def test_cannot_add_user_from_another_organisation( data = {"user_ids": [another_user.id]} # When - response = admin_client.post( + response = admin_client_new.post( url, data=json.dumps(data), content_type="application/json" ) @@ -413,7 +438,7 @@ def test_cannot_add_user_from_another_organisation( def test_cannot_add_same_user_twice( staff_user: FFAdminUser, organisation: Organisation, - admin_client: APIClient, + admin_client_new: APIClient, ) -> None: # Given group = UserPermissionGroup.objects.create( @@ -427,7 +452,7 @@ def test_cannot_add_same_user_twice( data = {"user_ids": [staff_user.id]} # When - admin_client.post(url, data=json.dumps(data), content_type="application/json") + admin_client_new.post(url, data=json.dumps(data), content_type="application/json") # Then assert staff_user in group.users.all() and group.users.count() == 1 @@ -437,7 +462,7 @@ def test_remove_users_from_group( staff_user: FFAdminUser, organisation: Organisation, admin_user: FFAdminUser, - admin_client: APIClient, + admin_client_new: APIClient, ) -> None: # Given group = UserPermissionGroup.objects.create( @@ -452,7 +477,7 @@ def test_remove_users_from_group( data = {"user_ids": [staff_user.id]} # When - admin_client.post(url, data=json.dumps(data), content_type="application/json") + admin_client_new.post(url, data=json.dumps(data), content_type="application/json") # Then # staff user has been removed @@ -465,7 +490,7 @@ def test_remove_users_from_group( def test_remove_users_silently_fails_if_user_not_in_group( staff_user: FFAdminUser, organisation: Organisation, - admin_client: APIClient, + admin_client_new: APIClient, admin_user: FFAdminUser, ) -> None: # Given @@ -480,7 +505,7 @@ def test_remove_users_silently_fails_if_user_not_in_group( data = {"user_ids": [staff_user.id]} # When - response = admin_client.post( + response = admin_client_new.post( url, data=json.dumps(data), content_type="application/json" ) @@ -492,7 +517,7 @@ def test_remove_users_silently_fails_if_user_not_in_group( def test_user_permission_group_can_update_is_default( - admin_client, organisation, user_permission_group + admin_client_new, organisation, user_permission_group ): # Given args = [organisation.id, user_permission_group.id] @@ -501,7 +526,7 @@ def test_user_permission_group_can_update_is_default( data = {"is_default": True, "name": user_permission_group.name} # When - response = admin_client.put(url, data=data) + response = admin_client_new.put(url, data=data) # Then assert response.status_code == status.HTTP_200_OK @@ -513,7 +538,7 @@ def test_user_permission_group_can_update_is_default( def test_user_permission_group_can_update_external_id( - admin_client, organisation, user_permission_group + admin_client_new, organisation, user_permission_group ): # Given args = [organisation.id, user_permission_group.id] @@ -523,7 +548,7 @@ def test_user_permission_group_can_update_external_id( data = {"external_id": external_id, "name": user_permission_group.name} # When - response = admin_client.put(url, data=data) + response = admin_client_new.put(url, data=data) # Then assert response.status_code == status.HTTP_200_OK @@ -531,7 +556,7 @@ def test_user_permission_group_can_update_external_id( def test_users_in_organisation_have_last_login( - admin_client, organisation, rf, mocker, admin_user + admin_client_new, organisation, rf, mocker, admin_user ): # Given req = rf.get("/") @@ -544,7 +569,7 @@ def test_users_in_organisation_have_last_login( ) # When - res = admin_client.get(url) + res = admin_client_new.get(url) # Then assert res.json()[0]["last_login"] is not None @@ -552,7 +577,7 @@ def test_users_in_organisation_have_last_login( def test_retrieve_user_permission_group_includes_group_admin( - admin_client, admin_user, organisation, user_permission_group + admin_client_new, admin_user, organisation, user_permission_group ): # Given group_admin_user = FFAdminUser.objects.create(email="groupadminuser@example.com") @@ -565,7 +590,7 @@ def test_retrieve_user_permission_group_includes_group_admin( ) # When - response = admin_client.get(url) + response = admin_client_new.get(url) # Then assert response.status_code == status.HTTP_200_OK diff --git a/api/users/abc.py b/api/users/abc.py index 103e9aca0d08..69696f69ac05 100644 --- a/api/users/abc.py +++ b/api/users/abc.py @@ -25,6 +25,10 @@ def is_project_admin(self, project: "Project") -> bool: def is_environment_admin(self, environment: "Environment") -> bool: raise NotImplementedError() + @abstractmethod + def is_group_admin(self, group_id: int) -> bool: + raise NotImplementedError() + @abstractmethod def has_project_permission(self, permission: str, project: "Project") -> bool: raise NotImplementedError() diff --git a/api/users/views.py b/api/users/views.py index 5b57bbc684f6..82ee6a41a9b5 100644 --- a/api/users/views.py +++ b/api/users/views.py @@ -227,8 +227,12 @@ def add_users(self, request, organisation_pk, pk): group = self.get_object() user_ids = request.data["user_ids"] - if request.user.id in user_ids and not request.user.is_organisation_admin( - Organisation.objects.get(pk=organisation_pk) + if ( + isinstance(request.user, FFAdminUser) + and request.user.id in user_ids + and not request.user.is_organisation_admin( + Organisation.objects.get(pk=organisation_pk) + ) ): raise PermissionDenied("Non-admin users cannot add themselves to a group.") @@ -248,8 +252,12 @@ def remove_users(self, request, organisation_pk, pk): group = self.get_object() user_ids = request.data["user_ids"] - if request.user.id in user_ids and not request.user.is_organisation_admin( - Organisation.objects.get(pk=organisation_pk) + if ( + isinstance(request.user, FFAdminUser) + and request.user.id in user_ids + and not request.user.is_organisation_admin( + Organisation.objects.get(pk=organisation_pk) + ) ): raise PermissionDenied( "Non-admin users cannot remove themselves from a group."