diff --git a/awx/main/models/rbac.py b/awx/main/models/rbac.py index 0bff43a3b09e..29edccab2cd5 100644 --- a/awx/main/models/rbac.py +++ b/awx/main/models/rbac.py @@ -15,16 +15,18 @@ # Django from django.db import models, transaction, connection -from django.db.models.signals import m2m_changed +from django.db.models.signals import m2m_changed, post_save, post_delete +from django.dispatch import receiver from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.fields import GenericForeignKey from django.utils.translation import gettext_lazy as _ +from django.core.exceptions import ObjectDoesNotExist from django.apps import apps from django.conf import settings # Ansible_base app -from ansible_base.rbac.models import RoleDefinition +from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment, RoleTeamAssignment from ansible_base.lib.utils.models import get_type_for_model # AWX @@ -733,5 +735,84 @@ def sync_parents_to_new_rbac(instance, action, model, pk_set, reverse, **kwargs) give_or_remove_permission(child_role, team, giving=is_giving) +ROLE_DEFINITION_TO_ROLE_FIELD = { + 'Organization Member': 'member_role', + 'Controller Organization Member': 'member_role', + 'WorkflowJobTemplate Admin': 'admin_role', + 'Organization WorkflowJobTemplate Admin': 'workflow_admin_role', + 'WorkflowJobTemplate Execute': 'execute_role', + 'WorkflowJobTemplate Approve': 'approval_role', + 'InstanceGroup Admin': 'admin_role', + 'InstanceGroup Use': 'use_role', + 'Organization ExecutionEnvironment Admin': 'execution_environment_admin_role', + 'Project Admin': 'admin_role', + 'Organization Project Admin': 'project_admin_role', + 'Project Use': 'use_role', + 'Project Update': 'update_role', + 'JobTemplate Admin': 'admin_role', + 'Organization JobTemplate Admin': 'job_template_admin_role', + 'JobTemplate Execute': 'execute_role', + 'Inventory Admin': 'admin_role', + 'Organization Inventory Admin': 'inventory_admin_role', + 'Inventory Use': 'use_role', + 'Inventory Adhoc': 'adhoc_role', + 'Inventory Update': 'update_role', + 'Organization NotificationTemplate Admin': 'notification_admin_role', + 'Credential Admin': 'admin_role', + 'Organization Credential Admin': 'credential_admin_role', + 'Credential Use': 'use_role', + 'Team Admin': 'admin_role', + 'Controller Team Admin': 'admin_role', + 'Team Member': 'member_role', + 'Controller Team Member': 'member_role', + 'Organization Admin': 'admin_role', + 'Controller Organization Admin': 'admin_role', + 'Organization Audit': 'auditor_role', + 'Organization Execute': 'execute_role', + 'Organization Approval': 'approval_role', +} + + +def _sync_assignments_to_old_rbac(instance, delete=True): + from awx.main.signals import disable_activity_stream + + with disable_activity_stream(): + with disable_rbac_sync(): + field_name = ROLE_DEFINITION_TO_ROLE_FIELD.get(instance.role_definition.name) + if not field_name: + return + try: + role = getattr(instance.object_role.content_object, field_name) + # in the case RoleUserAssignment is being cascade deleted, then + # object_role might not exist. In which case the object is about to be removed + # anyways so just return + except ObjectDoesNotExist: + return + if isinstance(instance.actor, get_user_model()): + # user + if delete: + role.members.remove(instance.actor) + else: + role.members.add(instance.actor) + else: + # team + if delete: + instance.team.member_role.children.remove(role) + else: + instance.team.member_role.children.add(role) + + +@receiver(post_delete, sender=RoleUserAssignment) +@receiver(post_delete, sender=RoleTeamAssignment) +def sync_assignments_to_old_rbac_delete(instance, **kwargs): + _sync_assignments_to_old_rbac(instance, delete=True) + + +@receiver(post_save, sender=RoleUserAssignment) +@receiver(post_save, sender=RoleTeamAssignment) +def sync_user_assignments_to_old_rbac_create(instance, **kwargs): + _sync_assignments_to_old_rbac(instance, delete=False) + + m2m_changed.connect(sync_members_to_new_rbac, Role.members.through) m2m_changed.connect(sync_parents_to_new_rbac, Role.parents.through) diff --git a/awx/main/tests/functional/dab_rbac/test_translation_layer_new_to_old.py b/awx/main/tests/functional/dab_rbac/test_translation_layer_new_to_old.py new file mode 100644 index 000000000000..946c76179f39 --- /dev/null +++ b/awx/main/tests/functional/dab_rbac/test_translation_layer_new_to_old.py @@ -0,0 +1,80 @@ +from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment, RoleTeamAssignment +from ansible_base.lib.utils.response import get_relative_url +import pytest + + +@pytest.mark.django_db +class TestNewToOld: + ''' + Tests that the DAB RBAC system is correctly translated to the old RBAC system + Namely, tests functionality of the _sync_assignments_to_old_rbac signal handler + ''' + + def test_new_to_old_rbac_addition(self, admin, post, inventory, bob, setup_managed_roles): + ''' + Assign user to Inventory Admin role definition, should be added to inventory.admin_role.members + ''' + rd = RoleDefinition.objects.get(name='Inventory Admin') + + url = get_relative_url('roleuserassignment-list') + post(url, user=admin, data={'role_definition': rd.id, 'user': bob.id, 'object_id': inventory.id}, expect=201) + assert bob in inventory.admin_role.members.all() + + def test_new_to_old_rbac_removal(self, admin, delete, inventory, bob, setup_managed_roles): + ''' + Remove user from Inventory Admin role definition, should be deleted from inventory.admin_role.members + ''' + inventory.admin_role.members.add(bob) + + rd = RoleDefinition.objects.get(name='Inventory Admin') + user_assignment = RoleUserAssignment.objects.get(user=bob, role_definition=rd, object_id=inventory.id) + + url = get_relative_url('roleuserassignment-detail', kwargs={'pk': user_assignment.id}) + delete(url, user=admin, expect=204) + assert bob not in inventory.admin_role.members.all() + + def test_new_to_old_rbac_team_member_addition(self, admin, post, team, bob, setup_managed_roles): + ''' + Assign user to Controller Team Member role definition, should be added to team.member_role.members + ''' + rd = RoleDefinition.objects.get(name='Controller Team Member') + + url = get_relative_url('roleuserassignment-list') + post(url, user=admin, data={'role_definition': rd.id, 'user': bob.id, 'object_id': team.id}, expect=201) + assert bob in team.member_role.members.all() + + def test_new_to_old_rbac_team_member_removal(self, admin, delete, team, bob): + ''' + Remove user from Controller Team Member role definition, should be deleted from team.member_role.members + ''' + team.member_role.members.add(bob) + + rd = RoleDefinition.objects.get(name='Controller Team Member') + user_assignment = RoleUserAssignment.objects.get(user=bob, role_definition=rd, object_id=team.id) + + url = get_relative_url('roleuserassignment-detail', kwargs={'pk': user_assignment.id}) + delete(url, user=admin, expect=204) + assert bob not in team.member_role.members.all() + + def test_new_to_old_rbac_team_addition(self, admin, post, team, inventory, setup_managed_roles): + ''' + Assign team to Inventory Admin role definition, should be added to inventory.admin_role.parents + ''' + rd = RoleDefinition.objects.get(name='Inventory Admin') + + url = get_relative_url('roleteamassignment-list') + post(url, user=admin, data={'role_definition': rd.id, 'team': team.id, 'object_id': inventory.id}, expect=201) + assert team.member_role in inventory.admin_role.parents.all() + + def test_new_to_old_rbac_team_removal(self, admin, delete, team, inventory, setup_managed_roles): + ''' + Remove team from Inventory Admin role definition, should be deleted from inventory.admin_role.parents + ''' + inventory.admin_role.parents.add(team.member_role) + + rd = RoleDefinition.objects.get(name='Inventory Admin') + team_assignment = RoleTeamAssignment.objects.get(team=team, role_definition=rd, object_id=inventory.id) + + url = get_relative_url('roleteamassignment-detail', kwargs={'pk': team_assignment.id}) + delete(url, user=admin, expect=204) + assert team.member_role not in inventory.admin_role.parents.all()