Skip to content

Commit

Permalink
refactor: Add permission check to get_policy
Browse files Browse the repository at this point in the history
  • Loading branch information
mike-pisman committed Oct 12, 2023
1 parent 9a852b3 commit 924e462
Showing 1 changed file with 49 additions and 59 deletions.
108 changes: 49 additions & 59 deletions src/unipoll_api/actions/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,36 @@
from unipoll_api.utils import Permissions


# Helper function to get policies from a resource
# NOTE: This can be moved to utils.py
async def get_policies_from_resource(resource: Resource) -> list[Policy]:
account: Account = AccountManager.active_user.get()
req_permissions: Permissions.Permissions | None = None
policies: list[Policy] = []
if resource.resource_type == "workspace":
req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"]
elif resource.resource_type == "group":
req_permissions = Permissions.GroupPermissions["get_group_policies"]
elif resource.resource_type == "poll":
req_permissions = Permissions.PollPermissions["get_poll_policies"]
if req_permissions:
permissions = await Permissions.get_all_permissions(resource, account)
if Permissions.check_permission(permissions, req_permissions):
policies = resource.policies # type: ignore
else:
for policy in resource.policies:
if policy.policy_holder.ref.id == account.id: # type: ignore
policies.append(policy) # type: ignore
return policies


# Get all policies of a workspace
async def get_policies(policy_holder: Account | Group | None = None,
resource: Resource | None = None) -> PolicySchemas.PolicyList:
policy_list = []
policy: Policy

account: Account = AccountManager.active_user.get()
all_policies = []

# Less efficient way of getting policies
#
# search_filter = {}
# if policy_holder:
# search_filter['policy_holder.ref'] = policy_holder.ref
# if resource:
# search_filter['parent_resource.ref'] = resource.ref
# all_policies = await Policy.find(search_filter).to_list()
# for policy in all_policies:
# permissions = await Permissions.get_all_permissions(policy.parent_resource, account)
# if policy.parent_resource.resource_type == "workspace":
# req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"]
# elif policy.parent_resource.resource_type == "group":
# req_permissions = Permissions.GroupPermissions["get_group_policies"]
# elif policy.parent_resource.resource_type == "poll":
# req_permissions = Permissions.PollPermissions["get_poll_policies"]
# if Permissions.check_permission(permissions, req_permissions):
# elif policy_holder.id == policy.policy_holder.ref.id:
# policy_list.append(await get_policy(policy))
# policy_list.append(await get_policy(policy))

# Helper function to get policies from a resource
async def get_policies_from_resource(resource: Resource) -> list[Policy]:
req_permissions: Permissions.Permissions | None = None
if resource.resource_type == "workspace":
req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"]
elif resource.resource_type == "group":
req_permissions = Permissions.GroupPermissions["get_group_policies"]
if req_permissions:
permissions = await Permissions.get_all_permissions(resource, account)
if Permissions.check_permission(permissions, req_permissions):
return resource.policies # type: ignore
else:
user_policy = await Policy.find_one({"policy_holder.ref": account.ref, "parent_resource.ref": resource.ref})
return [user_policy] if user_policy else []
return []

# Get policies from a specific resource
if resource:
all_policies = await get_policies_from_resource(resource)
Expand All @@ -62,39 +46,45 @@ async def get_policies_from_resource(resource: Resource) -> list[Policy]:

for resource in all_resources:
all_policies += await get_policies_from_resource(resource)

# Build policy list
for policy in all_policies:
# Filter by policy_holder if specified
if policy_holder:
if (policy.policy_holder.ref.id != policy_holder.id):
continue
policy_list.append(await get_policy(policy))
policy_list.append(await get_policy(policy, False))
# Return policy list
return PolicySchemas.PolicyList(policies=policy_list)


async def get_policy(policy: Policy) -> PolicySchemas.PolicyShort:
async def get_policy(policy: Policy, permission_check: bool = True) -> PolicySchemas.PolicyShort:
if permission_check:
account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(policy.parent_resource, account)
resource_type: str = policy.parent_resource.resource_type # type: ignore

# NOTE: Alternatevely, we can check here if the user has the required permissions to get the policy
if resource_type == "workspace": # type: ignore
req_permissions = Permissions.WorkspacePermissions["get_workspace_policies"] # type: ignore
elif resource_type == "group": # type: ignore
req_permissions = Permissions.GroupPermissions["get_group_policies"] # type: ignore
elif resource_type == "poll": # type: ignore
req_permissions = Permissions.PollPermissions["get_poll_policies"] # type: ignore
else:
raise ResourceExceptions.InternalServerError("Unknown resource type")

if not Permissions.check_permission(permissions, req_permissions):
raise ResourceExceptions.UserNotAuthorized(account, "policy", "Get policy")

# Convert policy_holder link to Member object
ph_type = policy.policy_holder_type
ph_ref = policy.policy_holder.ref.id
policy_holder = await Account.get(ph_ref) if ph_type == "account" else await Group.get(ph_ref)

if not policy_holder:
raise PolicyExceptions.PolicyHolderNotFound(ph_ref)

policy_holder = MemberSchemas.Member(**policy_holder.model_dump()) # type: ignore
if policy.parent_resource.resource_type == "workspace": # type: ignore
PermissionType = Permissions.WorkspacePermissions
elif policy.parent_resource.resource_type == "group": # type: ignore
PermissionType = Permissions.GroupPermissions
elif policy.parent_resource.resource_type == "poll": # type: ignore
PermissionType = Permissions.PollPermissions
else:
raise ResourceExceptions.InternalServerError("Unknown resource type")

resource_type: str = policy.parent_resource.resource_type # type: ignore
PermissionType = eval("Permissions." + resource_type.capitalize() + "Permissions")
permissions = PermissionType(policy.permissions).name.split('|') # type: ignore
return PolicySchemas.PolicyShort(id=policy.id,
policy_holder_type=policy.policy_holder_type,
Expand All @@ -111,14 +101,14 @@ async def update_policy(policy: Policy, new_permissions: list[str]) -> PolicySch
account: Account = AccountManager.active_user.get()
permissions = await Permissions.get_all_permissions(policy.parent_resource, account)
if policy.parent_resource.resource_type == "workspace": # type: ignore
ResourcePermissions = Permissions.WorkspacePermissions
req_permissions = Permissions.WorkspacePermissions["set_workspace_policy"]
ResourcePermissions = Permissions.WorkspacePermissions # type: ignore
req_permissions = Permissions.WorkspacePermissions["set_workspace_policy"] # type: ignore
elif policy.parent_resource.resource_type == "group": # type: ignore
ResourcePermissions = Permissions.GroupPermissions
req_permissions = Permissions.GroupPermissions["set_group_policy"]
ResourcePermissions = Permissions.GroupPermissions # type: ignore
req_permissions = Permissions.GroupPermissions["set_group_policy"] # type: ignore
elif policy.parent_resource.resource_type == "poll": # type: ignore
ResourcePermissions = Permissions.PollPermissions
req_permissions = Permissions.PollPermissions["set_poll_policy"]
ResourcePermissions = Permissions.PollPermissions # type: ignore
req_permissions = Permissions.PollPermissions["set_poll_policy"] # type: ignore
else:
raise ResourceExceptions.InternalServerError("Unknown resource type")

Expand Down

0 comments on commit 924e462

Please sign in to comment.