diff --git a/botocove/cove_host_account.py b/botocove/cove_host_account.py index 2526ec1..94514ac 100644 --- a/botocove/cove_host_account.py +++ b/botocove/cove_host_account.py @@ -3,6 +3,7 @@ from functools import lru_cache from typing import ( Any, + Dict, Iterable, List, Literal, @@ -11,14 +12,13 @@ Set, Tuple, Union, - cast, ) import boto3 from boto3.session import Session from botocore.config import Config from mypy_boto3_organizations.client import OrganizationsClient -from mypy_boto3_organizations.type_defs import ListChildrenResponseTypeDef +from mypy_boto3_organizations.type_defs import AccountTypeDef from mypy_boto3_sts.client import STSClient from mypy_boto3_sts.type_defs import PolicyDescriptorTypeTypeDef @@ -60,8 +60,10 @@ def __init__( self.target_regions = regions self.provided_ignore_ids = ignore_ids - self.target_accounts = self._resolve_target_accounts(target_ids) - if not self.target_accounts: + + self._account_map = self._resolve_target_accounts(target_ids) + + if not self._account_map: raise ValueError( "There are no eligible account ids to run decorated func against" ) @@ -73,6 +75,10 @@ def __init__( self.org_master = org_master + @property + def target_accounts(self) -> Set[str]: + return set(self._account_map) + def get_cove_sessions(self) -> List[CoveSessionInformation]: logger.info(f"Getting session information for {self.target_accounts=}") logger.info(f"Role: {self.role_to_assume=} {self.role_session_name=}") @@ -81,9 +87,9 @@ def get_cove_sessions(self) -> List[CoveSessionInformation]: def _generate_account_sessions(self) -> Iterable[CoveSessionInformation]: for region in self.target_regions: - for account_id in self.target_accounts: - account_details: CoveSessionInformation = CoveSessionInformation( - Id=account_id, + for account in self._account_map.values(): + account_details = CoveSessionInformation( + Id=account["Id"], RoleName=self.role_to_assume, RoleSessionName=self.role_session_name, Policy=self.policy, @@ -91,10 +97,10 @@ def _generate_account_sessions(self) -> Iterable[CoveSessionInformation]: AssumeRoleSuccess=False, Region=region, ExceptionDetails=None, - Name=None, - Arn=None, - Email=None, - Status=None, + Name=account["Name"], + Arn=account["Arn"], + Email=account["Email"], + Status=account["Status"], Result=None, ) yield account_details @@ -128,17 +134,41 @@ def _get_boto3_sts_client(self, assuming_session: Optional[Session]) -> STSClien client: STSClient = self._get_boto3_client("sts", assuming_session) return client - def _resolve_target_accounts(self, target_ids: Optional[List[str]]) -> Set[str]: + def _resolve_target_accounts( + self, target_ids: Optional[List[str]] + ) -> Dict[str, AccountTypeDef]: accounts_to_ignore = self._gather_ignored_accounts() logger.info(f"Ignoring account IDs: {accounts_to_ignore=}") accounts_to_target = self._gather_target_accounts(target_ids) - final_accounts: Set = accounts_to_target - accounts_to_ignore + + final_accounts = { + a["Id"]: self._ensure_complete_account_description(a) + for a in accounts_to_target + if a["Id"] not in accounts_to_ignore + } + if len(final_accounts) < 1: raise ValueError( "There are no eligible account ids to run decorated func against" ) + return final_accounts + def _ensure_complete_account_description( + self, account: AccountTypeDef + ) -> AccountTypeDef: + """If the description has only an ID, then the account was probably + gathered from an account ID in the target_ids list. In this case, return + a new complete account description. Otherwise assume the description is + complete and return the input account. + """ + + if list(account.keys()) == ["Id"]: + logger.info(f"Describing incomplete account {account['Id']}") + desc = self.org_client.describe_account(AccountId=account["Id"]) + return desc["Account"] + return account + def _gather_ignored_accounts(self) -> Set[str]: ignored_accounts = {self.host_account_id} @@ -147,17 +177,20 @@ def _gather_ignored_accounts(self) -> Set[str]: ignored_accounts.update(accs) if ous: accs_from_ous = self._get_all_accounts_by_organization_units(ous) - ignored_accounts.update(accs_from_ous) + ignored_accounts.update(a["Id"] for a in accs_from_ous) return ignored_accounts - def _gather_target_accounts(self, targets: Optional[List[str]]) -> Set[str]: + def _gather_target_accounts( + self, targets: Optional[List[str]] + ) -> List[AccountTypeDef]: if targets: accs, ous = self._get_validated_ids(targets) + target_accounts: List[AccountTypeDef] = [{"Id": a} for a in accs] if ous: accs_from_ous = self._get_all_accounts_by_organization_units(ous) - accs.extend(accs_from_ous) - return set(accs) + target_accounts.extend(accs_from_ous) + return target_accounts else: # No target_ids passed, getting all accounts in org return self._get_active_org_accounts() @@ -186,9 +219,9 @@ def _get_validated_ids(self, ids: List[str]) -> Tuple[List[str], List[str]]: def _get_all_accounts_by_organization_units( self, target_ous: List[str] - ) -> List[str]: + ) -> List[AccountTypeDef]: - account_list: List[str] = [] + account_list: List[AccountTypeDef] = [] for parent_ou in target_ous: @@ -210,46 +243,42 @@ def _get_all_child_ous(self, parent_ou: str, ou_list: List[str]) -> None: """Depth-first recursion mutates the current_ou_list present in the calling function to establish all children of a parent OU""" child_ous = self._get_child_ous(parent_ou) - child_ous_list = [ou["Id"] for ou in child_ous["Children"]] - ou_list.extend(child_ous_list) + ou_list.extend(child_ous) - for ou in child_ous_list: + for ou in child_ous: self._get_all_child_ous(ou, ou_list) def _get_accounts_by_organization_units( self, organization_units: List[str] - ) -> List[str]: + ) -> List[AccountTypeDef]: - account_list: List[str] = [] + account_list: List[AccountTypeDef] = [] for ou in organization_units: ou_children = self._get_child_accounts(ou) - account_list.extend(acc["Id"] for acc in ou_children["Children"]) + account_list.extend(ou_children) return account_list - def _get_active_org_accounts(self) -> Set[str]: - all_org_accounts = ( - self.org_client.get_paginator("list_accounts") - .paginate() - .build_full_result()["Accounts"] - ) - return {acc["Id"] for acc in all_org_accounts if acc["Status"] == "ACTIVE"} + def _get_active_org_accounts(self) -> List[AccountTypeDef]: + pages = self.org_client.get_paginator("list_accounts").paginate() + return [ + account + for page in pages + for account in page["Accounts"] + if account["Status"] == "ACTIVE" + ] @lru_cache() - def _get_child_ous(self, parent_ou: str) -> ListChildrenResponseTypeDef: - return cast( - ListChildrenResponseTypeDef, - self.org_client.get_paginator("list_children") - .paginate(ChildType="ORGANIZATIONAL_UNIT", ParentId=parent_ou) - .build_full_result(), - ) + def _get_child_ous(self, parent_ou: str) -> List[str]: + pages = self.org_client.get_paginator( + "list_organizational_units_for_parent" + ).paginate(ParentId=parent_ou) + return [ou["Id"] for page in pages for ou in page["OrganizationalUnits"]] @lru_cache() - def _get_child_accounts(self, parent_ou: str) -> ListChildrenResponseTypeDef: - return cast( - ListChildrenResponseTypeDef, - self.org_client.get_paginator("list_children") - .paginate(ChildType="ACCOUNT", ParentId=parent_ou) - .build_full_result(), + def _get_child_accounts(self, parent_ou: str) -> List[AccountTypeDef]: + pages = self.org_client.get_paginator("list_accounts_for_parent").paginate( + ParentId=parent_ou ) + return [account for page in pages for account in page["Accounts"]] diff --git a/botocove/cove_session.py b/botocove/cove_session.py index 4012bfe..c363737 100644 --- a/botocove/cove_session.py +++ b/botocove/cove_session.py @@ -4,7 +4,6 @@ from boto3.session import Session from botocore.exceptions import ClientError from mypy_boto3_organizations.client import OrganizationsClient -from mypy_boto3_organizations.type_defs import AccountTypeDef from mypy_boto3_sts.client import STSClient from botocove.cove_types import CoveSessionInformation @@ -44,21 +43,6 @@ def activate_cove_session(self) -> "CoveSession": f"{self.session_information['RoleName']}" ) - if self.org_master: - try: - account_description: AccountTypeDef = self.org_client.describe_account( - AccountId=self.session_information["Id"] - )["Account"] - self.session_information["Arn"] = account_description["Arn"] - self.session_information["Email"] = account_description["Email"] - self.session_information["Name"] = account_description["Name"] - self.session_information["Status"] = account_description["Status"] - except ClientError: - logger.exception( - "Failed to call describe_account for " - f"{self.session_information['Id']}" - ) - try: logger.debug(f"Attempting to assume {role_arn}") diff --git a/tests/test_session.py b/tests/test_session.py index d41f3b3..92486e1 100644 --- a/tests/test_session.py +++ b/tests/test_session.py @@ -1,11 +1,9 @@ -from typing import Any, List +from typing import List import pytest from boto3 import Session -from botocore.exceptions import ClientError from mypy_boto3_organizations.type_defs import AccountTypeDef from mypy_boto3_sts.type_defs import PolicyDescriptorTypeTypeDef -from pytest_mock import MockerFixture from botocove import CoveSession, cove @@ -99,33 +97,3 @@ def simple_func(session: CoveSession, a_string: str) -> str: } ] assert cove_output["Results"] == expected - - -def test_session_result_error_handler( - org_accounts: List[AccountTypeDef], mocker: MockerFixture -) -> None: - def describe_account(*args: Any, **kwargs: Any) -> None: - raise ClientError( - {"Error": {"Message": "broken!", "Code": "OhNo"}}, "describe_account" - ) - - mocker.patch( - "moto.organizations.models.OrganizationsBackend.describe_account", - describe_account, - ) - - @cove() - def simple_func(session: CoveSession, a_string: str) -> str: - return a_string - - cove_output = simple_func("test-string") - expected = [ - { - "Id": org_accounts[1]["Id"], - "AssumeRoleSuccess": True, - "Result": "test-string", - "RoleName": "OrganizationAccountAccessRole", - "RoleSessionName": "OrganizationAccountAccessRole", - } - ] - assert cove_output["Results"] == expected