Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Translate new RBAC to old RBAC #15490

Merged
merged 5 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions awx/main/models/rbac.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@

# Django
from django.db import models, transaction, connection
from django.db.models.signals import m2m_changed
from django.db.models.signals import m2m_changed, post_save, post_delete
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes.fields import GenericForeignKey
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ObjectDoesNotExist
from django.apps import apps
from django.conf import settings

# Ansible_base app
from ansible_base.rbac.models import RoleDefinition
from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment, RoleTeamAssignment
from ansible_base.lib.utils.models import get_type_for_model

# AWX
Expand Down Expand Up @@ -733,5 +735,84 @@ def sync_parents_to_new_rbac(instance, action, model, pk_set, reverse, **kwargs)
give_or_remove_permission(child_role, team, giving=is_giving)


ROLE_DEFINITION_TO_ROLE_FIELD = {
'Organization Member': 'member_role',
'Controller Organization Member': 'member_role',
'WorkflowJobTemplate Admin': 'admin_role',
'Organization WorkflowJobTemplate Admin': 'workflow_admin_role',
'WorkflowJobTemplate Execute': 'execute_role',
'WorkflowJobTemplate Approve': 'approval_role',
'InstanceGroup Admin': 'admin_role',
'InstanceGroup Use': 'use_role',
'Organization ExecutionEnvironment Admin': 'execution_environment_admin_role',
'Project Admin': 'admin_role',
'Organization Project Admin': 'project_admin_role',
'Project Use': 'use_role',
'Project Update': 'update_role',
'JobTemplate Admin': 'admin_role',
'Organization JobTemplate Admin': 'job_template_admin_role',
'JobTemplate Execute': 'execute_role',
'Inventory Admin': 'admin_role',
'Organization Inventory Admin': 'inventory_admin_role',
'Inventory Use': 'use_role',
'Inventory Adhoc': 'adhoc_role',
'Inventory Update': 'update_role',
'Organization NotificationTemplate Admin': 'notification_admin_role',
'Credential Admin': 'admin_role',
'Organization Credential Admin': 'credential_admin_role',
'Credential Use': 'use_role',
'Team Admin': 'admin_role',
'Controller Team Admin': 'admin_role',
'Team Member': 'member_role',
'Controller Team Member': 'member_role',
'Organization Admin': 'admin_role',
'Controller Organization Admin': 'admin_role',
'Organization Audit': 'auditor_role',
'Organization Execute': 'execute_role',
'Organization Approval': 'approval_role',
}


def _sync_assignments_to_old_rbac(instance, delete=True):
from awx.main.signals import disable_activity_stream

with disable_activity_stream():
with disable_rbac_sync():
field_name = ROLE_DEFINITION_TO_ROLE_FIELD.get(instance.role_definition.name)
if not field_name:
return
try:
role = getattr(instance.object_role.content_object, field_name)
# in the case RoleUserAssignment is being cascade deleted, then
# object_role might not exist. In which case the object is about to be removed
# anyways so just return
except ObjectDoesNotExist:
return
if isinstance(instance.actor, get_user_model()):
# user
if delete:
role.members.remove(instance.actor)
else:
role.members.add(instance.actor)
else:
# team
if delete:
instance.team.member_role.children.remove(role)
else:
instance.team.member_role.children.add(role)


@receiver(post_delete, sender=RoleUserAssignment)
@receiver(post_delete, sender=RoleTeamAssignment)
def sync_assignments_to_old_rbac_delete(instance, **kwargs):
_sync_assignments_to_old_rbac(instance, delete=True)


@receiver(post_save, sender=RoleUserAssignment)
@receiver(post_save, sender=RoleTeamAssignment)
def sync_user_assignments_to_old_rbac_create(instance, **kwargs):
_sync_assignments_to_old_rbac(instance, delete=False)


m2m_changed.connect(sync_members_to_new_rbac, Role.members.through)
m2m_changed.connect(sync_parents_to_new_rbac, Role.parents.through)
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from ansible_base.rbac.models import RoleDefinition, RoleUserAssignment, RoleTeamAssignment
from ansible_base.lib.utils.response import get_relative_url
import pytest


@pytest.mark.django_db
class TestNewToOld:
'''
Tests that the DAB RBAC system is correctly translated to the old RBAC system
Namely, tests functionality of the _sync_assignments_to_old_rbac signal handler
'''

def test_new_to_old_rbac_addition(self, admin, post, inventory, bob, setup_managed_roles):
'''
Assign user to Inventory Admin role definition, should be added to inventory.admin_role.members
'''
rd = RoleDefinition.objects.get(name='Inventory Admin')

url = get_relative_url('roleuserassignment-list')
post(url, user=admin, data={'role_definition': rd.id, 'user': bob.id, 'object_id': inventory.id}, expect=201)
assert bob in inventory.admin_role.members.all()

def test_new_to_old_rbac_removal(self, admin, delete, inventory, bob, setup_managed_roles):
'''
Remove user from Inventory Admin role definition, should be deleted from inventory.admin_role.members
'''
inventory.admin_role.members.add(bob)

rd = RoleDefinition.objects.get(name='Inventory Admin')
user_assignment = RoleUserAssignment.objects.get(user=bob, role_definition=rd, object_id=inventory.id)

url = get_relative_url('roleuserassignment-detail', kwargs={'pk': user_assignment.id})
delete(url, user=admin, expect=204)
assert bob not in inventory.admin_role.members.all()

def test_new_to_old_rbac_team_member_addition(self, admin, post, team, bob, setup_managed_roles):
'''
Assign user to Controller Team Member role definition, should be added to team.member_role.members
'''
rd = RoleDefinition.objects.get(name='Controller Team Member')

url = get_relative_url('roleuserassignment-list')
post(url, user=admin, data={'role_definition': rd.id, 'user': bob.id, 'object_id': team.id}, expect=201)
assert bob in team.member_role.members.all()

def test_new_to_old_rbac_team_member_removal(self, admin, delete, team, bob):
'''
Remove user from Controller Team Member role definition, should be deleted from team.member_role.members
'''
team.member_role.members.add(bob)

rd = RoleDefinition.objects.get(name='Controller Team Member')
user_assignment = RoleUserAssignment.objects.get(user=bob, role_definition=rd, object_id=team.id)

url = get_relative_url('roleuserassignment-detail', kwargs={'pk': user_assignment.id})
delete(url, user=admin, expect=204)
assert bob not in team.member_role.members.all()

def test_new_to_old_rbac_team_addition(self, admin, post, team, inventory, setup_managed_roles):
'''
Assign team to Inventory Admin role definition, should be added to inventory.admin_role.parents
'''
rd = RoleDefinition.objects.get(name='Inventory Admin')

url = get_relative_url('roleteamassignment-list')
post(url, user=admin, data={'role_definition': rd.id, 'team': team.id, 'object_id': inventory.id}, expect=201)
assert team.member_role in inventory.admin_role.parents.all()

def test_new_to_old_rbac_team_removal(self, admin, delete, team, inventory, setup_managed_roles):
'''
Remove team from Inventory Admin role definition, should be deleted from inventory.admin_role.parents
'''
inventory.admin_role.parents.add(team.member_role)

rd = RoleDefinition.objects.get(name='Inventory Admin')
team_assignment = RoleTeamAssignment.objects.get(team=team, role_definition=rd, object_id=inventory.id)

url = get_relative_url('roleteamassignment-detail', kwargs={'pk': team_assignment.id})
delete(url, user=admin, expect=204)
assert team.member_role not in inventory.admin_role.parents.all()
Loading