Scenario Advanced Python Python AWS Scripting

Multi-Account Security Report — Assume Roles Across All Org Accounts

Python script that uses AWS Organizations to list all accounts, assumes a cross-account role in each, audits security controls, and generates a consolidated report.

January 20, 2025 9 min read ~30 min to complete DB
The Situation

Enterprise security governance — instead of auditing 20 AWS accounts manually, this script audits all of them in parallel using role assumption and generates a single compliance report.

8 Steps
7 Services Used
~30 min Duration
Advanced Difficulty

Problem Statement

Your organization has 30 AWS accounts across dev, staging, and production environments. Your security team must verify that GuardDuty, CloudTrail, and AWS Config are enabled in every account — and that root MFA is always on. Doing this manually takes 3 hours. This script runs in under 2 minutes.


Pre-requisite: Deploy the Audit Role to All Accounts

# CloudFormation StackSet to deploy SecurityAuditRole to all accounts
# (run from the Organizations management account)

aws cloudformation create-stack-set \
  --stack-set-name SecurityAuditRole \
  --template-body file://security-audit-role.yaml \
  --capabilities CAPABILITY_NAMED_IAM \
  --permission-model SERVICE_MANAGED \
  --auto-deployment Enabled=true,RetainStacksOnAccountRemoval=false
# security-audit-role.yaml
Resources:
  SecurityAuditRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName: SecurityAuditRole
      AssumeRolePolicyDocument:
        Statement:
          - Effect: Allow
            Principal:
              AWS: "arn:aws:iam::MANAGEMENT_ACCOUNT_ID:root"
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/SecurityAudit
        - arn:aws:iam::aws:policy/ReadOnlyAccess

Complete Script

import boto3
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")


# ── Step 1: Get all active org accounts ──────────────────────────
def get_all_org_accounts() -> list[dict]:
    """
    list_accounts() returns all accounts in the AWS Organization.
    We must call this from the MANAGEMENT (master) account.
    
    Only include ACTIVE accounts — SUSPENDED accounts cannot have
    roles assumed in them and would cause AssumeRole to fail.
    
    paginator handles the pagination automatically (max 20 per page).
    """
    org = boto3.client("organizations")
    accounts = []
    paginator = org.get_paginator("list_accounts")
    for page in paginator.paginate():
        accounts.extend(
            acct for acct in page["Accounts"] if acct["Status"] == "ACTIVE"
        )
    return accounts


# ── Step 2: Assume role in target account ────────────────────────
def assume_role(account_id: str, role_name: str = "SecurityAuditRole") -> dict | None:
    """
    assume_role() exchanges your current credentials for temporary
    credentials in the target account. The target account must have
    an IAM role that trusts your source account's identity.

    RoleArn format: arn:aws:iam::ACCOUNT_ID:role/ROLE_NAME
    RoleSessionName: appears in CloudTrail logs in the target account.
    DurationSeconds: max 3600s (1 hour) for chained role assumption.

    Returns a dict of credentials or None if assumption fails.
    The credentials dict has four keys:
      AccessKeyId, SecretAccessKey, SessionToken, Expiration
    """
    sts = boto3.client("sts")
    role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"

    try:
        response = sts.assume_role(
            RoleArn=role_arn,
            RoleSessionName="SecurityAuditSession",
            DurationSeconds=3600,
        )
        creds = response["Credentials"]
        return {
            "aws_access_key_id":     creds["AccessKeyId"],
            "aws_secret_access_key": creds["SecretAccessKey"],
            "aws_session_token":     creds["SessionToken"],
        }
    except Exception as e:
        logger.warning(f"Cannot assume role in {account_id}: {e}")
        return None


# ── Step 3: Audit a single account ───────────────────────────────
def audit_account(account: dict) -> dict:
    """
    Run 5 security checks against a single AWS account using
    temporary credentials from assume_role().

    Each check is a separate boto3 client call in the target account.
    We use kwargs unpacking (**creds) to pass the temporary credentials.
    """
    account_id   = account["Id"]
    account_name = account["Name"]
    findings: dict = {
        "account_id":   account_id,
        "account_name": account_name,
        "checks":       {},
        "error":        None,
    }

    creds = assume_role(account_id)
    if creds is None:
        findings["error"] = "Cannot assume SecurityAuditRole"
        return findings

    try:
        # ── Check 1: Root MFA ──────────────────────────────────────
        # get_account_summary() returns aggregate IAM statistics.
        # AccountMFAEnabled=1 means MFA is on for the root account.
        # AccountMFAEnabled=0 means root has NO MFA — critical finding!
        iam = boto3.client("iam", **creds)
        summary = iam.get_account_summary()["SummaryMap"]
        findings["checks"]["root_mfa_enabled"] = bool(
            summary.get("AccountMFAEnabled", 0)
        )

        # ── Check 2: CloudTrail (multi-region) ────────────────────
        # describe_trails(includeShadowTrails=False) returns only trails
        # created in THIS region (not replicated shadow trails).
        # A multi-region trail captures API calls from all regions —
        # required for complete audit coverage.
        ct = boto3.client("cloudtrail", **creds)
        trails = ct.describe_trails(includeShadowTrails=False)["trailList"]
        findings["checks"]["cloudtrail_multi_region"] = any(
            t.get("IsMultiRegionTrail") for t in trails
        )

        # ── Check 3: GuardDuty ────────────────────────────────────
        # list_detectors() returns GuardDuty detector IDs in this region.
        # A detector is the GuardDuty "engine" — you need one per region.
        # Empty list = GuardDuty not enabled in this region.
        gd = boto3.client("guardduty", **creds)
        detectors = gd.list_detectors()["DetectorIds"]
        findings["checks"]["guardduty_enabled"] = len(detectors) > 0

        # ── Check 4: AWS Config ───────────────────────────────────
        # describe_configuration_recorders() returns the Config recorders.
        # You need at least one recording to track resource configurations.
        config = boto3.client("config", **creds)
        recorders = config.describe_configuration_recorders()[
            "ConfigurationRecorders"
        ]
        findings["checks"]["aws_config_enabled"] = len(recorders) > 0

        # ── Check 5: Password Policy ──────────────────────────────
        # get_account_password_policy() raises NoSuchEntityException
        # if no password policy is set — we treat that as non-compliant.
        try:
            policy = iam.get_account_password_policy()["PasswordPolicy"]
            findings["checks"]["password_min_length"] = policy.get(
                "MinimumPasswordLength", 0
            )
            findings["checks"]["password_requires_symbols"] = policy.get(
                "RequireSymbols", False
            )
            findings["checks"]["password_max_age_days"] = policy.get(
                "MaxPasswordAge", 9999
            )
        except iam.exceptions.NoSuchEntityException:
            findings["checks"]["password_policy"] = "NOT_CONFIGURED"

    except Exception as e:
        findings["error"] = str(e)
        logger.error(f"Audit failed for {account_id}: {e}")

    return findings


# ── Step 4: Run all audits in parallel ───────────────────────────
def generate_consolidated_report(
    audit_role: str = "SecurityAuditRole",
    max_workers: int = 10,
    output_file: str = "org_security_report.json",
) -> list[dict]:
    """
    ThreadPoolExecutor runs audit_account() concurrently for all accounts.
    max_workers=10 means up to 10 accounts are audited simultaneously.
    More workers = faster, but watch STS assume_role rate limits.

    as_completed() yields futures as they finish (not in submission order).
    This means the report might not be in account ID order — we sort later.
    """
    accounts = get_all_org_accounts()
    logger.info(f"Auditing {len(accounts)} active accounts with {max_workers} workers")

    results = []
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {
            executor.submit(audit_account, acct): acct
            for acct in accounts
        }
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            status = "✅" if not result.get("error") else "❌"
            checks_pass = all(
                bool(v) for v in result.get("checks", {}).values()
                if isinstance(v, bool)
            )
            print(
                f"{status} {result['account_id']} ({result['account_name']}) "
                f"{'✓ All checks passed' if checks_pass else '⚠ Issues found'}"
            )

    # Sort by account name for readability
    results.sort(key=lambda x: x["account_name"])

    # ── Print summary ──────────────────────────────────────────────
    print(f"\n{'='*70}")
    print("CONSOLIDATED SECURITY REPORT")
    print(f"Generated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}")
    print(f"{'='*70}")

    failed_checks = 0
    for r in results:
        print(f"\n  Account: {r['account_name']} ({r['account_id']})")
        if r.get("error"):
            print(f"    ⚠️  Error: {r['error']}")
            continue
        for check, val in r.get("checks", {}).items():
            if isinstance(val, bool):
                icon = "✅" if val else "❌"
                if not val:
                    failed_checks += 1
            else:
                icon = "ℹ️"
            print(f"    {icon} {check}: {val}")

    print(f"\n{'='*70}")
    print(f"Total accounts:   {len(results)}")
    print(f"Failed checks:    {failed_checks}")

    # Save full report
    with open(output_file, "w") as f:
        json.dump(results, f, indent=2, default=str)
    print(f"Full report saved to {output_file}")

    return results


if __name__ == "__main__":
    generate_consolidated_report(
        audit_role="SecurityAuditRole",
        max_workers=10,
    )

Key Commands Explained

CommandWhat it does
org.get_paginator("list_accounts")Paginates all accounts in the AWS Organization
sts.assume_role(RoleArn, RoleSessionName, DurationSeconds)Returns temporary credentials for the target account
response["Credentials"]Contains AccessKeyId, SecretAccessKey, SessionToken, Expiration
boto3.client("iam", **creds)Creates an IAM client using the assumed role’s credentials
iam.get_account_summary()["SummaryMap"]["AccountMFAEnabled"]1 = root MFA on, 0 = root MFA off
ct.describe_trails(includeShadowTrails=False)Lists CloudTrail trails created in this region
gd.list_detectors()["DetectorIds"]Returns GuardDuty detector IDs — empty = not enabled
ThreadPoolExecutor(max_workers=10)Runs audits on 10 accounts simultaneously
as_completed(futures)Yields results as each thread finishes

🔍 Line-by-Line Code Walkthrough

Imports

LineWhy It’s Used
import boto3AWS SDK — used for Organizations, STS, IAM, CloudTrail, GuardDuty, Config clients
import jsonSerializes the audit report to a JSON file
import loggingStructured log output
from concurrent.futures import ThreadPoolExecutor, as_completedThreadPoolExecutor runs audit functions in parallel threads. as_completed yields results as threads finish
from datetime import datetimeTimestamps for the report header

get_all_org_accounts()

org = boto3.client("organizations")
paginator = org.get_paginator("list_accounts")
for page in paginator.paginate():
    accounts.extend(
        acct for acct in page["Accounts"] if acct["Status"] == "ACTIVE"
    )
LineExplanation
boto3.client("organizations")AWS Organizations client. Must be called from the management (master) account — member accounts cannot call list_accounts()
get_paginator("list_accounts")Returns max 20 accounts per page. Pagination is mandatory for organizations with more than 20 accounts
acct["Status"] == "ACTIVE"Filter to active accounts only. SUSPENDED accounts cannot have roles assumed in them — trying to assume a role there raises AccessDenied

assume_role(account_id, role_name)

sts = boto3.client("sts")
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
response = sts.assume_role(
    RoleArn=role_arn,
    RoleSessionName="SecurityAuditSession",
    DurationSeconds=3600,
)
creds = response["Credentials"]
return {
    "aws_access_key_id":     creds["AccessKeyId"],
    "aws_secret_access_key": creds["SecretAccessKey"],
    "aws_session_token":     creds["SessionToken"],
}
LineExplanation
boto3.client("sts")STS (Security Token Service) — exchanges your current credentials for temporary credentials in another account
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"Builds the full role ARN. Every account’s role ARN follows this pattern with the account’s 12-digit ID
RoleArn=role_arnThe role in the target account to assume. Must have a trust policy allowing your source account’s identity
RoleSessionName="SecurityAuditSession"Appears in CloudTrail logs in the TARGET account as the session name — useful for auditing who (which script) made the calls
DurationSeconds=3600Temporary credentials are valid for 1 hour. Max for chained role assumption is 3600
response["Credentials"]Dict with AccessKeyId, SecretAccessKey, SessionToken, Expiration
Return dict with "aws_access_key_id" etc.These are the exact keyword arguments that boto3.client() accepts — enables boto3.client("iam", **creds) pattern

audit_account(account) — 5 Security Checks

iam = boto3.client("iam", **creds)
summary = iam.get_account_summary()["SummaryMap"]
findings["checks"]["root_mfa_enabled"] = bool(summary.get("AccountMFAEnabled", 0))
LineExplanation
boto3.client("iam", **creds)Creates an IAM client using the assumed role’s temporary credentials. **creds unpacks the dict as keyword args: aws_access_key_id=..., aws_secret_access_key=..., aws_session_token=...
get_account_summary()Returns aggregate IAM statistics for the account. "SummaryMap" is the key containing the counts
summary.get("AccountMFAEnabled", 0)Returns 1 if root MFA is enabled, 0 if not. The key may be absent in some older accounts
bool(...)Converts 1True, 0False for clean JSON output
ct = boto3.client("cloudtrail", **creds)
trails = ct.describe_trails(includeShadowTrails=False)["trailList"]
findings["checks"]["cloudtrail_multi_region"] = any(t.get("IsMultiRegionTrail") for t in trails)
LineExplanation
boto3.client("cloudtrail", **creds)CloudTrail client using the assumed role’s credentials — operates in the TARGET account
describe_trails(includeShadowTrails=False)Returns trails created in this region. includeShadowTrails=False excludes replicated copies of multi-region trails from other regions (otherwise you’d see the same trail multiple times)
any(t.get("IsMultiRegionTrail") for t in trails)Returns True if at least one trail has IsMultiRegionTrail=True. A multi-region trail captures API calls from all regions
gd = boto3.client("guardduty", **creds)
detectors = gd.list_detectors()["DetectorIds"]
findings["checks"]["guardduty_enabled"] = len(detectors) > 0
LineExplanation
boto3.client("guardduty", **creds)GuardDuty client in the target account
gd.list_detectors()["DetectorIds"]Returns a list of GuardDuty detector IDs in this region. A detector is the GuardDuty monitoring engine — you need one per region
len(detectors) > 0Empty list = GuardDuty not enabled in this region. Returns True/False
config = boto3.client("config", **creds)
recorders = config.describe_configuration_recorders()["ConfigurationRecorders"]
findings["checks"]["aws_config_enabled"] = len(recorders) > 0
LineExplanation
describe_configuration_recorders()Returns the list of AWS Config recorders. A recorder tracks resource configuration changes
len(recorders) > 0Empty list = Config not enabled. At least one recorder needed for compliance tracking
try:
    policy = iam.get_account_password_policy()["PasswordPolicy"]
    findings["checks"]["password_min_length"] = policy.get("MinimumPasswordLength", 0)
except iam.exceptions.NoSuchEntityException:
    findings["checks"]["password_policy"] = "NOT_CONFIGURED"
LineExplanation
get_account_password_policy()Raises NoSuchEntityException if no password policy is configured (the default AWS state). This exception handling is necessary
policy.get("MinimumPasswordLength", 0)CIS Benchmark requires at least 14 characters. Default is 0 when no policy exists

generate_consolidated_report(audit_role, max_workers, output_file)

with ThreadPoolExecutor(max_workers=max_workers) as executor:
    futures = {
        executor.submit(audit_account, acct): acct
        for acct in accounts
    }
    for future in as_completed(futures):
        result = future.result()
        results.append(result)
LineExplanation
ThreadPoolExecutor(max_workers=10)Creates a thread pool with 10 worker threads. Up to 10 audit_account() calls run simultaneously — reduces total time from N × 10s to roughly ⌈N/10⌉ × 10s
executor.submit(audit_account, acct)Submits audit_account(acct) to run in a worker thread. Returns a Future object immediately (non-blocking)
futures = {future: acct for acct in accounts}Dict comprehension creating a mapping from Future → account. Used to identify which account each future belongs to
as_completed(futures)Yields each Future as it completes (not in submission order). Allows processing results as they arrive rather than waiting for all to finish
future.result()Blocks until this specific future completes, then returns the return value of audit_account(). Any exception raised inside the thread is re-raised here
json.dump(results, f, indent=2, default=str)
LineExplanation
json.dump(results, f, indent=2)Writes the results list to the JSON file with 2-space indentation
default=strjson.dump doesn’t know how to serialize datetime objects by default. default=str converts any non-serializable object (datetime, Decimal) to its string representation
Services Used
OrganizationsSTSIAMCloudTrailGuardDutyConfigboto3
Prerequisites
  • Python 3.8+
  • boto3
  • AWS Organizations master account access
  • Cross-account IAM role deployed in each member account
  • IAM: organizations:ListAccounts, sts:AssumeRole
What You Learned
  • STS assume_role pattern
  • Cross-account boto3 client creation
  • ThreadPoolExecutor for parallel account auditing
  • AWS Organizations API
  • Multi-account IAM role deployment

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios