From cdd9cd8fdd7174180aaf1b300506f8909c8616f6 Mon Sep 17 00:00:00 2001 From: Joel McCoy Date: Thu, 4 Jan 2024 09:27:48 -0600 Subject: [PATCH] Implements sso-admin AWS Managed Policies (#7184) --- moto/ssoadmin/exceptions.py | 25 ++- moto/ssoadmin/models.py | 114 +++++++++- moto/ssoadmin/responses.py | 49 +++++ moto/ssoadmin/utils.py | 7 + tests/test_ssoadmin/test_ssoadmin_policies.py | 198 ++++++++++++++++++ 5 files changed, 389 insertions(+), 4 deletions(-) diff --git a/moto/ssoadmin/exceptions.py b/moto/ssoadmin/exceptions.py index a5f5df6e2a7c..32cc025ef372 100644 --- a/moto/ssoadmin/exceptions.py +++ b/moto/ssoadmin/exceptions.py @@ -3,9 +3,30 @@ class ResourceNotFoundException(JsonRESTError): - def __init__(self, message: str = "Account not found") -> None: + code = 400 + + def __init__(self, message: str = "") -> None: super().__init__( error_type="ResourceNotFoundException", message=message, - code="ResourceNotFoundException", + ) + + +class ConflictException(JsonRESTError): + code = 400 + + def __init__(self, message: str = "") -> None: + super().__init__( + error_type="ConflictException", + message=message, + ) + + +class ServiceQuotaExceededException(JsonRESTError): + code = 400 + + def __init__(self, message: str = "") -> None: + super().__init__( + error_type="ServiceQuotaExceededException", + message=message, ) diff --git a/moto/ssoadmin/models.py b/moto/ssoadmin/models.py index 5cb410868e4f..7d7433f80d08 100644 --- a/moto/ssoadmin/models.py +++ b/moto/ssoadmin/models.py @@ -1,13 +1,22 @@ -from typing import Any, Dict, List +import json +from typing import Any, Dict, List, Optional from moto.core import BackendDict, BaseBackend, BaseModel from moto.core.utils import unix_time +from moto.iam.aws_managed_policies import aws_managed_policies_data from moto.moto_api._internal import mock_random as random from moto.utilities.paginator import paginate -from .exceptions import ResourceNotFoundException +from .exceptions import ( + ConflictException, + ResourceNotFoundException, + ServiceQuotaExceededException, +) from .utils import PAGINATION_MODEL +# https://docs.aws.amazon.com/singlesignon/latest/userguide/limits.html +MAX_MANAGED_POLICIES_PER_PERMISSION_SET = 20 + class AccountAssignment(BaseModel): def __init__( @@ -60,6 +69,10 @@ def __init__( self.tags = tags self.created_date = unix_time() self.inline_policy = "" + self.managed_policies: List[ManagedPolicy] = list() + self.total_managed_policies_attached = ( + 0 # this will also include customer managed policies + ) def to_json(self, include_creation_date: bool = False) -> Dict[str, Any]: summary: Dict[str, Any] = { @@ -83,6 +96,17 @@ def generate_id(instance_arn: str) -> str: ) +class ManagedPolicy(BaseModel): + def __init__(self, arn: str, name: str): + self.arn = arn + self.name = name + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, ManagedPolicy): + return False + return self.arn == other.arn + + class SSOAdminBackend(BaseBackend): """Implementation of SSOAdmin APIs.""" @@ -90,6 +114,7 @@ def __init__(self, region_name: str, account_id: str): super().__init__(region_name, account_id) self.account_assignments: List[AccountAssignment] = list() self.permission_sets: List[PermissionSet] = list() + self.aws_managed_policies: Optional[Dict[str, Any]] = None def create_account_assignment( self, @@ -158,6 +183,26 @@ def _find_account( return account raise ResourceNotFoundException + def _find_managed_policy(self, managed_policy_arn: str) -> ManagedPolicy: + """ + Checks to make sure the managed policy exists. + This pulls from moto/iam/aws_managed_policies.py + """ + # Lazy loading of aws managed policies file + if self.aws_managed_policies is None: + self.aws_managed_policies = json.loads(aws_managed_policies_data) + + policy_name = managed_policy_arn.split("/")[-1] + managed_policy = self.aws_managed_policies.get(policy_name, None) + if managed_policy is not None: + path = managed_policy.get("path", "/") + expected_arn = f"arn:aws:iam::aws:policy{path}{policy_name}" + if managed_policy_arn == expected_arn: + return ManagedPolicy(managed_policy_arn, policy_name) + raise ResourceNotFoundException( + f"Policy does not exist with ARN: {managed_policy_arn}" + ) + @paginate(PAGINATION_MODEL) # type: ignore[misc] def list_account_assignments( self, instance_arn: str, account_id: str, permission_set_arn: str @@ -314,5 +359,70 @@ def delete_inline_policy_from_permission_set( ) permission_set.inline_policy = "" + def attach_managed_policy_to_permission_set( + self, instance_arn: str, permission_set_arn: str, managed_policy_arn: str + ) -> None: + permissionset = self._find_permission_set( + instance_arn, + permission_set_arn, + ) + managed_policy = self._find_managed_policy(managed_policy_arn) + + permissionset_id = permission_set_arn.split("/")[-1] + if managed_policy in permissionset.managed_policies: + raise ConflictException( + f"Permission set with id {permissionset_id} already has a typed link attachment to a manged policy with {managed_policy_arn}" + ) + + if ( + permissionset.total_managed_policies_attached + >= MAX_MANAGED_POLICIES_PER_PERMISSION_SET + ): + permissionset_id = permission_set_arn.split("/")[-1] + raise ServiceQuotaExceededException( + f"You have exceeded AWS SSO limits. Cannot create ManagedPolicy more than {MAX_MANAGED_POLICIES_PER_PERMISSION_SET} for id {permissionset_id}. Please refer to https://docs.aws.amazon.com/singlesignon/latest/userguide/limits.html" + ) + + permissionset.managed_policies.append(managed_policy) + permissionset.total_managed_policies_attached += 1 + + @paginate(pagination_model=PAGINATION_MODEL) # type: ignore[misc] + def list_managed_policies_in_permission_set( + self, + instance_arn: str, + permission_set_arn: str, + ) -> List[ManagedPolicy]: + permissionset = self._find_permission_set( + instance_arn, + permission_set_arn, + ) + return permissionset.managed_policies + + def _detach_managed_policy( + self, instance_arn: str, permission_set_arn: str, managed_policy_arn: str + ) -> None: + # ensure permission_set exists + permissionset = self._find_permission_set( + instance_arn, + permission_set_arn, + ) + + for managed_policy in permissionset.managed_policies: + if managed_policy.arn == managed_policy_arn: + permissionset.managed_policies.remove(managed_policy) + permissionset.total_managed_policies_attached -= 1 + return + + raise ResourceNotFoundException( + f"Could not find ManagedPolicy with arn {managed_policy_arn}" + ) + + def detach_managed_policy_from_permission_set( + self, instance_arn: str, permission_set_arn: str, managed_policy_arn: str + ) -> None: + self._detach_managed_policy( + instance_arn, permission_set_arn, managed_policy_arn + ) + ssoadmin_backends = BackendDict(SSOAdminBackend, "sso") diff --git a/moto/ssoadmin/responses.py b/moto/ssoadmin/responses.py index dec1cb94c011..3187f803a9ec 100644 --- a/moto/ssoadmin/responses.py +++ b/moto/ssoadmin/responses.py @@ -195,3 +195,52 @@ def delete_inline_policy_from_permission_set(self) -> str: permission_set_arn=permission_set_arn, ) return json.dumps({}) + + def attach_managed_policy_to_permission_set(self) -> str: + instance_arn = self._get_param("InstanceArn") + permission_set_arn = self._get_param("PermissionSetArn") + managed_policy_arn = self._get_param("ManagedPolicyArn") + self.ssoadmin_backend.attach_managed_policy_to_permission_set( + instance_arn=instance_arn, + permission_set_arn=permission_set_arn, + managed_policy_arn=managed_policy_arn, + ) + return json.dumps({}) + + def list_managed_policies_in_permission_set(self) -> str: + instance_arn = self._get_param("InstanceArn") + permission_set_arn = self._get_param("PermissionSetArn") + max_results = self._get_int_param("MaxResults") + next_token = self._get_param("NextToken") + + ( + managed_policies, + next_token, + ) = self.ssoadmin_backend.list_managed_policies_in_permission_set( + instance_arn=instance_arn, + permission_set_arn=permission_set_arn, + max_results=max_results, + next_token=next_token, + ) + + managed_policies_response = [ + {"Arn": managed_policy.arn, "Name": managed_policy.name} + for managed_policy in managed_policies + ] + return json.dumps( + { + "AttachedManagedPolicies": managed_policies_response, + "NextToken": next_token, + } + ) + + def detach_managed_policy_from_permission_set(self) -> str: + instance_arn = self._get_param("InstanceArn") + permission_set_arn = self._get_param("PermissionSetArn") + managed_policy_arn = self._get_param("ManagedPolicyArn") + self.ssoadmin_backend.detach_managed_policy_from_permission_set( + instance_arn=instance_arn, + permission_set_arn=permission_set_arn, + managed_policy_arn=managed_policy_arn, + ) + return json.dumps({}) diff --git a/moto/ssoadmin/utils.py b/moto/ssoadmin/utils.py index e7c3230ed88e..48d2a2952fc2 100644 --- a/moto/ssoadmin/utils.py +++ b/moto/ssoadmin/utils.py @@ -30,4 +30,11 @@ "PrincipalType", ], }, + "list_managed_policies_in_permission_set": { + "input_token": "next_token", + "limit_key": "max_results", + "limit_default": 100, + "result_key": "AttachedManagedPolicies", + "unique_attribute": ["arn"], + }, } diff --git a/tests/test_ssoadmin/test_ssoadmin_policies.py b/tests/test_ssoadmin/test_ssoadmin_policies.py index 7eee8260c264..f9f2add04f5b 100644 --- a/tests/test_ssoadmin/test_ssoadmin_policies.py +++ b/tests/test_ssoadmin/test_ssoadmin_policies.py @@ -5,6 +5,7 @@ from botocore.exceptions import ClientError from moto import mock_ssoadmin +from moto.iam.aws_managed_policies import aws_managed_policies_data # See our Development Tips on writing tests for hints on how to write good tests: # http://docs.getmoto.org/en/latest/docs/contributing/development_tips/tests.html @@ -15,6 +16,11 @@ DUMMY_INSTANCE_ARN = "arn:aws:sso:::instance/ins-aaaabbbbccccdddd" +@pytest.fixture(name="managed_policies") +def get_managed_policies(): + return json.loads(aws_managed_policies_data) + + def create_permissionset(client) -> str: """Helper function to create a dummy permission set and returns the arn.""" @@ -130,3 +136,195 @@ def test_delete_inline_policy_to_permissionset(): ) assert response["InlinePolicy"] == "" + + +@mock_ssoadmin +def test_attach_managed_policy_to_permission_set(): + client = boto3.client("sso-admin", region_name="us-east-1") + + permission_set_arn = create_permissionset(client) + permissionset_id = permission_set_arn.split("/")[-1] + managed_policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess" + + client.attach_managed_policy_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ManagedPolicyArn=managed_policy_arn, + ) + + response = client.list_managed_policies_in_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ) + + assert response["AttachedManagedPolicies"][0]["Name"] == "AdministratorAccess" + assert ( + response["AttachedManagedPolicies"][0]["Arn"] + == "arn:aws:iam::aws:policy/AdministratorAccess" + ) + + # test for managed policy that is already attached + with pytest.raises(ClientError) as e: + client.attach_managed_policy_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ManagedPolicyArn=managed_policy_arn, + ) + err = e.value.response["Error"] + assert err["Code"] == "ConflictException" + assert ( + err["Message"] + == f"Permission set with id {permissionset_id} already has a typed link attachment to a manged policy with {managed_policy_arn}" + ) + + # test for managed policy that does not exist + not_exist_managed_policy_arn = "arn:aws:iam::aws:policy/DoesNotExist" + with pytest.raises(ClientError) as e: + client.attach_managed_policy_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ManagedPolicyArn=not_exist_managed_policy_arn, + ) + err = e.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] + == "Policy does not exist with ARN: arn:aws:iam::aws:policy/DoesNotExist" + ) + + +@mock_ssoadmin +def test_list_managed_policies_quota_limit(managed_policies): + """ + Tests exceeding the managed policy quota limit. + """ + managed_policies_to_attach = [] + policy_count = 0 + for policy_name in managed_policies: + path = managed_policies[policy_name]["Path"] + # only attach policies with path "/" + if path != "/": + continue + managed_policies_to_attach.append(policy_name) + policy_count += 1 + if policy_count >= 21: # 20 is the quota limit + break + + client = boto3.client("sso-admin", region_name="us-east-1") + permission_set_arn = create_permissionset(client) + permission_set_id = permission_set_arn.split("/")[-1] + + arn_string = "arn:aws:iam::aws:policy/" + with pytest.raises(ClientError) as e: + # the 21st policy should exceed the quota limit + for managed_policy in managed_policies_to_attach: + client.attach_managed_policy_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ManagedPolicyArn=arn_string + managed_policy, + ) + err = e.value.response["Error"] + assert err["Code"] == "ServiceQuotaExceededException" + assert ( + err["Message"] + == f"You have exceeded AWS SSO limits. Cannot create ManagedPolicy more than 20 for id {permission_set_id}. Please refer to https://docs.aws.amazon.com/singlesignon/latest/userguide/limits.html" + ) + + +@mock_ssoadmin +def test_list_managed_policies_in_permission_set(managed_policies): + """ + Tests functionality of listing aws managed policies attached to a permission set. + This also tests the pagination functionality. + """ + client = boto3.client("sso-admin", region_name="us-east-1") + + arn_string = "arn:aws:iam::aws:policy/" + + # create a dummy permission set + permission_set_arn = create_permissionset(client) + + managed_policies_names = list(managed_policies.keys()) + + # attach 3 good managed policies + for idx in range(3): + managed_policy_name = managed_policies_names[idx] + + client.attach_managed_policy_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ManagedPolicyArn=arn_string + managed_policy_name, + ) + + response = client.list_managed_policies_in_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + MaxResults=2, + ) + + managed_policies = [] + + assert len(response["AttachedManagedPolicies"]) == 2 + managed_policies.extend(response["AttachedManagedPolicies"]) + next_token = response["NextToken"] + + response = client.list_managed_policies_in_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + MaxResults=2, + NextToken=next_token, + ) + + assert len(response["AttachedManagedPolicies"]) == 1 + managed_policies.extend(response["AttachedManagedPolicies"]) + + # ensure the 3 unique managed policies were returned + actual_managed_policy_names = [ + managed_policy["Name"] for managed_policy in managed_policies + ] + expected_managed_policy_names = managed_policies_names[:3] + assert all( + name in actual_managed_policy_names for name in expected_managed_policy_names + ) + + +@mock_ssoadmin +def test_detach_managed_policy_from_permission_set(): + client = boto3.client("sso-admin", region_name="us-east-1") + permission_set_arn = create_permissionset(client) + managed_policy_arn = "arn:aws:iam::aws:policy/AdministratorAccess" + + # test for managed policy that is not attached + with pytest.raises(ClientError) as e: + client.detach_managed_policy_from_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ManagedPolicyArn=managed_policy_arn, + ) + err = e.value.response["Error"] + assert err["Code"] == "ResourceNotFoundException" + assert ( + err["Message"] == f"Could not find ManagedPolicy with arn {managed_policy_arn}" + ) + + # attach managed policy + client.attach_managed_policy_to_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ManagedPolicyArn=managed_policy_arn, + ) + + # detach managed policy + client.detach_managed_policy_from_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ManagedPolicyArn=managed_policy_arn, + ) + + # ensure managed policy is detached + response = client.list_managed_policies_in_permission_set( + InstanceArn=DUMMY_INSTANCE_ARN, + PermissionSetArn=permission_set_arn, + ) + + assert len(response["AttachedManagedPolicies"]) == 0