Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep AccountTypeDef to avoid DescribeAccount calls #39

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 74 additions & 45 deletions botocove/cove_host_account.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from functools import lru_cache
from typing import (
Any,
Dict,
Iterable,
List,
Literal,
Expand All @@ -11,14 +12,13 @@
Set,
Tuple,
Union,
cast,
)

import boto3
from boto3.session import Session
from botocore.config import Config
from mypy_boto3_organizations.client import OrganizationsClient
from mypy_boto3_organizations.type_defs import ListChildrenResponseTypeDef
from mypy_boto3_organizations.type_defs import AccountTypeDef
from mypy_boto3_sts.client import STSClient
from mypy_boto3_sts.type_defs import PolicyDescriptorTypeTypeDef

Expand Down Expand Up @@ -60,8 +60,10 @@ def __init__(
self.target_regions = regions

self.provided_ignore_ids = ignore_ids
self.target_accounts = self._resolve_target_accounts(target_ids)
if not self.target_accounts:

self._account_map = self._resolve_target_accounts(target_ids)

if not self._account_map:
raise ValueError(
"There are no eligible account ids to run decorated func against"
)
Expand All @@ -73,6 +75,10 @@ def __init__(

self.org_master = org_master

@property
def target_accounts(self) -> Set[str]:
return set(self._account_map)

def get_cove_sessions(self) -> List[CoveSessionInformation]:
logger.info(f"Getting session information for {self.target_accounts=}")
logger.info(f"Role: {self.role_to_assume=} {self.role_session_name=}")
Expand All @@ -81,20 +87,20 @@ def get_cove_sessions(self) -> List[CoveSessionInformation]:

def _generate_account_sessions(self) -> Iterable[CoveSessionInformation]:
for region in self.target_regions:
for account_id in self.target_accounts:
account_details: CoveSessionInformation = CoveSessionInformation(
Id=account_id,
for account in self._account_map.values():
account_details = CoveSessionInformation(
Id=account["Id"],
RoleName=self.role_to_assume,
RoleSessionName=self.role_session_name,
Policy=self.policy,
PolicyArns=self.policy_arns,
AssumeRoleSuccess=False,
Region=region,
ExceptionDetails=None,
Name=None,
Arn=None,
Email=None,
Status=None,
Name=account["Name"],
Arn=account["Arn"],
Email=account["Email"],
Status=account["Status"],
Result=None,
)
yield account_details
Expand Down Expand Up @@ -128,17 +134,41 @@ def _get_boto3_sts_client(self, assuming_session: Optional[Session]) -> STSClien
client: STSClient = self._get_boto3_client("sts", assuming_session)
return client

def _resolve_target_accounts(self, target_ids: Optional[List[str]]) -> Set[str]:
def _resolve_target_accounts(
self, target_ids: Optional[List[str]]
) -> Dict[str, AccountTypeDef]:
accounts_to_ignore = self._gather_ignored_accounts()
logger.info(f"Ignoring account IDs: {accounts_to_ignore=}")
accounts_to_target = self._gather_target_accounts(target_ids)
final_accounts: Set = accounts_to_target - accounts_to_ignore

final_accounts = {
a["Id"]: self._ensure_complete_account_description(a)
for a in accounts_to_target
if a["Id"] not in accounts_to_ignore
}

if len(final_accounts) < 1:
raise ValueError(
"There are no eligible account ids to run decorated func against"
)

return final_accounts

def _ensure_complete_account_description(
self, account: AccountTypeDef
) -> AccountTypeDef:
"""If the description has only an ID, then the account was probably
gathered from an account ID in the target_ids list. In this case, return
a new complete account description. Otherwise assume the description is
complete and return the input account.
"""

if list(account.keys()) == ["Id"]:
logger.info(f"Describing incomplete account {account['Id']}")
desc = self.org_client.describe_account(AccountId=account["Id"])
return desc["Account"]
return account

def _gather_ignored_accounts(self) -> Set[str]:
ignored_accounts = {self.host_account_id}

Expand All @@ -147,17 +177,20 @@ def _gather_ignored_accounts(self) -> Set[str]:
ignored_accounts.update(accs)
if ous:
accs_from_ous = self._get_all_accounts_by_organization_units(ous)
ignored_accounts.update(accs_from_ous)
ignored_accounts.update(a["Id"] for a in accs_from_ous)

return ignored_accounts

def _gather_target_accounts(self, targets: Optional[List[str]]) -> Set[str]:
def _gather_target_accounts(
self, targets: Optional[List[str]]
) -> List[AccountTypeDef]:
if targets:
accs, ous = self._get_validated_ids(targets)
target_accounts: List[AccountTypeDef] = [{"Id": a} for a in accs]
if ous:
accs_from_ous = self._get_all_accounts_by_organization_units(ous)
accs.extend(accs_from_ous)
return set(accs)
target_accounts.extend(accs_from_ous)
return target_accounts
else:
# No target_ids passed, getting all accounts in org
return self._get_active_org_accounts()
Expand Down Expand Up @@ -186,9 +219,9 @@ def _get_validated_ids(self, ids: List[str]) -> Tuple[List[str], List[str]]:

def _get_all_accounts_by_organization_units(
self, target_ous: List[str]
) -> List[str]:
) -> List[AccountTypeDef]:

account_list: List[str] = []
account_list: List[AccountTypeDef] = []

for parent_ou in target_ous:

Expand All @@ -210,46 +243,42 @@ def _get_all_child_ous(self, parent_ou: str, ou_list: List[str]) -> None:
"""Depth-first recursion mutates the current_ou_list present in the calling
function to establish all children of a parent OU"""
child_ous = self._get_child_ous(parent_ou)
child_ous_list = [ou["Id"] for ou in child_ous["Children"]]
ou_list.extend(child_ous_list)
ou_list.extend(child_ous)

for ou in child_ous_list:
for ou in child_ous:
self._get_all_child_ous(ou, ou_list)

def _get_accounts_by_organization_units(
self, organization_units: List[str]
) -> List[str]:
) -> List[AccountTypeDef]:

account_list: List[str] = []
account_list: List[AccountTypeDef] = []

for ou in organization_units:
ou_children = self._get_child_accounts(ou)
account_list.extend(acc["Id"] for acc in ou_children["Children"])
account_list.extend(ou_children)

return account_list

def _get_active_org_accounts(self) -> Set[str]:
all_org_accounts = (
self.org_client.get_paginator("list_accounts")
.paginate()
.build_full_result()["Accounts"]
)
return {acc["Id"] for acc in all_org_accounts if acc["Status"] == "ACTIVE"}
def _get_active_org_accounts(self) -> List[AccountTypeDef]:
pages = self.org_client.get_paginator("list_accounts").paginate()
return [
account
for page in pages
for account in page["Accounts"]
if account["Status"] == "ACTIVE"
]

@lru_cache()
def _get_child_ous(self, parent_ou: str) -> ListChildrenResponseTypeDef:
return cast(
ListChildrenResponseTypeDef,
self.org_client.get_paginator("list_children")
.paginate(ChildType="ORGANIZATIONAL_UNIT", ParentId=parent_ou)
.build_full_result(),
)
def _get_child_ous(self, parent_ou: str) -> List[str]:
pages = self.org_client.get_paginator(
"list_organizational_units_for_parent"
).paginate(ParentId=parent_ou)
return [ou["Id"] for page in pages for ou in page["OrganizationalUnits"]]

@lru_cache()
def _get_child_accounts(self, parent_ou: str) -> ListChildrenResponseTypeDef:
return cast(
ListChildrenResponseTypeDef,
self.org_client.get_paginator("list_children")
.paginate(ChildType="ACCOUNT", ParentId=parent_ou)
.build_full_result(),
def _get_child_accounts(self, parent_ou: str) -> List[AccountTypeDef]:
pages = self.org_client.get_paginator("list_accounts_for_parent").paginate(
ParentId=parent_ou
)
return [account for page in pages for account in page["Accounts"]]
16 changes: 0 additions & 16 deletions botocove/cove_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from boto3.session import Session
from botocore.exceptions import ClientError
from mypy_boto3_organizations.client import OrganizationsClient
from mypy_boto3_organizations.type_defs import AccountTypeDef
from mypy_boto3_sts.client import STSClient

from botocove.cove_types import CoveSessionInformation
Expand Down Expand Up @@ -44,21 +43,6 @@ def activate_cove_session(self) -> "CoveSession":
f"{self.session_information['RoleName']}"
)

if self.org_master:
try:
account_description: AccountTypeDef = self.org_client.describe_account(
AccountId=self.session_information["Id"]
)["Account"]
self.session_information["Arn"] = account_description["Arn"]
self.session_information["Email"] = account_description["Email"]
self.session_information["Name"] = account_description["Name"]
self.session_information["Status"] = account_description["Status"]
except ClientError:
logger.exception(
"Failed to call describe_account for "
f"{self.session_information['Id']}"
)

try:
logger.debug(f"Attempting to assume {role_arn}")

Expand Down
34 changes: 1 addition & 33 deletions tests/test_session.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from typing import Any, List
from typing import List

import pytest
from boto3 import Session
from botocore.exceptions import ClientError
from mypy_boto3_organizations.type_defs import AccountTypeDef
from mypy_boto3_sts.type_defs import PolicyDescriptorTypeTypeDef
from pytest_mock import MockerFixture

from botocove import CoveSession, cove

Expand Down Expand Up @@ -99,33 +97,3 @@ def simple_func(session: CoveSession, a_string: str) -> str:
}
]
assert cove_output["Results"] == expected


def test_session_result_error_handler(
org_accounts: List[AccountTypeDef], mocker: MockerFixture
) -> None:
def describe_account(*args: Any, **kwargs: Any) -> None:
raise ClientError(
{"Error": {"Message": "broken!", "Code": "OhNo"}}, "describe_account"
)

mocker.patch(
"moto.organizations.models.OrganizationsBackend.describe_account",
describe_account,
)

@cove()
def simple_func(session: CoveSession, a_string: str) -> str:
return a_string

cove_output = simple_func("test-string")
expected = [
{
"Id": org_accounts[1]["Id"],
"AssumeRoleSuccess": True,
"Result": "test-string",
"RoleName": "OrganizationAccountAccessRole",
"RoleSessionName": "OrganizationAccountAccessRole",
}
]
assert cove_output["Results"] == expected