The Situation
Enterprise security governance — instead of auditing 20 AWS accounts manually, this script audits all of them in parallel using role assumption and generates a single compliance report.
Problem Statement
Your organization has 30 AWS accounts across dev, staging, and production environments. Your security team must verify that GuardDuty, CloudTrail, and AWS Config are enabled in every account — and that root MFA is always on. Doing this manually takes 3 hours. This script runs in under 2 minutes.
Pre-requisite: Deploy the Audit Role to All Accounts
# CloudFormation StackSet to deploy SecurityAuditRole to all accounts
# (run from the Organizations management account)
aws cloudformation create-stack-set \
--stack-set-name SecurityAuditRole \
--template-body file://security-audit-role.yaml \
--capabilities CAPABILITY_NAMED_IAM \
--permission-model SERVICE_MANAGED \
--auto-deployment Enabled=true,RetainStacksOnAccountRemoval=false
# security-audit-role.yaml
Resources:
SecurityAuditRole:
Type: AWS::IAM::Role
Properties:
RoleName: SecurityAuditRole
AssumeRolePolicyDocument:
Statement:
- Effect: Allow
Principal:
AWS: "arn:aws:iam::MANAGEMENT_ACCOUNT_ID:root"
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/SecurityAudit
- arn:aws:iam::aws:policy/ReadOnlyAccess
Complete Script
import boto3
import json
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
# ── Step 1: Get all active org accounts ──────────────────────────
def get_all_org_accounts() -> list[dict]:
"""
list_accounts() returns all accounts in the AWS Organization.
We must call this from the MANAGEMENT (master) account.
Only include ACTIVE accounts — SUSPENDED accounts cannot have
roles assumed in them and would cause AssumeRole to fail.
paginator handles the pagination automatically (max 20 per page).
"""
org = boto3.client("organizations")
accounts = []
paginator = org.get_paginator("list_accounts")
for page in paginator.paginate():
accounts.extend(
acct for acct in page["Accounts"] if acct["Status"] == "ACTIVE"
)
return accounts
# ── Step 2: Assume role in target account ────────────────────────
def assume_role(account_id: str, role_name: str = "SecurityAuditRole") -> dict | None:
"""
assume_role() exchanges your current credentials for temporary
credentials in the target account. The target account must have
an IAM role that trusts your source account's identity.
RoleArn format: arn:aws:iam::ACCOUNT_ID:role/ROLE_NAME
RoleSessionName: appears in CloudTrail logs in the target account.
DurationSeconds: max 3600s (1 hour) for chained role assumption.
Returns a dict of credentials or None if assumption fails.
The credentials dict has four keys:
AccessKeyId, SecretAccessKey, SessionToken, Expiration
"""
sts = boto3.client("sts")
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
try:
response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="SecurityAuditSession",
DurationSeconds=3600,
)
creds = response["Credentials"]
return {
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds["SecretAccessKey"],
"aws_session_token": creds["SessionToken"],
}
except Exception as e:
logger.warning(f"Cannot assume role in {account_id}: {e}")
return None
# ── Step 3: Audit a single account ───────────────────────────────
def audit_account(account: dict) -> dict:
"""
Run 5 security checks against a single AWS account using
temporary credentials from assume_role().
Each check is a separate boto3 client call in the target account.
We use kwargs unpacking (**creds) to pass the temporary credentials.
"""
account_id = account["Id"]
account_name = account["Name"]
findings: dict = {
"account_id": account_id,
"account_name": account_name,
"checks": {},
"error": None,
}
creds = assume_role(account_id)
if creds is None:
findings["error"] = "Cannot assume SecurityAuditRole"
return findings
try:
# ── Check 1: Root MFA ──────────────────────────────────────
# get_account_summary() returns aggregate IAM statistics.
# AccountMFAEnabled=1 means MFA is on for the root account.
# AccountMFAEnabled=0 means root has NO MFA — critical finding!
iam = boto3.client("iam", **creds)
summary = iam.get_account_summary()["SummaryMap"]
findings["checks"]["root_mfa_enabled"] = bool(
summary.get("AccountMFAEnabled", 0)
)
# ── Check 2: CloudTrail (multi-region) ────────────────────
# describe_trails(includeShadowTrails=False) returns only trails
# created in THIS region (not replicated shadow trails).
# A multi-region trail captures API calls from all regions —
# required for complete audit coverage.
ct = boto3.client("cloudtrail", **creds)
trails = ct.describe_trails(includeShadowTrails=False)["trailList"]
findings["checks"]["cloudtrail_multi_region"] = any(
t.get("IsMultiRegionTrail") for t in trails
)
# ── Check 3: GuardDuty ────────────────────────────────────
# list_detectors() returns GuardDuty detector IDs in this region.
# A detector is the GuardDuty "engine" — you need one per region.
# Empty list = GuardDuty not enabled in this region.
gd = boto3.client("guardduty", **creds)
detectors = gd.list_detectors()["DetectorIds"]
findings["checks"]["guardduty_enabled"] = len(detectors) > 0
# ── Check 4: AWS Config ───────────────────────────────────
# describe_configuration_recorders() returns the Config recorders.
# You need at least one recording to track resource configurations.
config = boto3.client("config", **creds)
recorders = config.describe_configuration_recorders()[
"ConfigurationRecorders"
]
findings["checks"]["aws_config_enabled"] = len(recorders) > 0
# ── Check 5: Password Policy ──────────────────────────────
# get_account_password_policy() raises NoSuchEntityException
# if no password policy is set — we treat that as non-compliant.
try:
policy = iam.get_account_password_policy()["PasswordPolicy"]
findings["checks"]["password_min_length"] = policy.get(
"MinimumPasswordLength", 0
)
findings["checks"]["password_requires_symbols"] = policy.get(
"RequireSymbols", False
)
findings["checks"]["password_max_age_days"] = policy.get(
"MaxPasswordAge", 9999
)
except iam.exceptions.NoSuchEntityException:
findings["checks"]["password_policy"] = "NOT_CONFIGURED"
except Exception as e:
findings["error"] = str(e)
logger.error(f"Audit failed for {account_id}: {e}")
return findings
# ── Step 4: Run all audits in parallel ───────────────────────────
def generate_consolidated_report(
audit_role: str = "SecurityAuditRole",
max_workers: int = 10,
output_file: str = "org_security_report.json",
) -> list[dict]:
"""
ThreadPoolExecutor runs audit_account() concurrently for all accounts.
max_workers=10 means up to 10 accounts are audited simultaneously.
More workers = faster, but watch STS assume_role rate limits.
as_completed() yields futures as they finish (not in submission order).
This means the report might not be in account ID order — we sort later.
"""
accounts = get_all_org_accounts()
logger.info(f"Auditing {len(accounts)} active accounts with {max_workers} workers")
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(audit_account, acct): acct
for acct in accounts
}
for future in as_completed(futures):
result = future.result()
results.append(result)
status = "✅" if not result.get("error") else "❌"
checks_pass = all(
bool(v) for v in result.get("checks", {}).values()
if isinstance(v, bool)
)
print(
f"{status} {result['account_id']} ({result['account_name']}) "
f"{'✓ All checks passed' if checks_pass else '⚠ Issues found'}"
)
# Sort by account name for readability
results.sort(key=lambda x: x["account_name"])
# ── Print summary ──────────────────────────────────────────────
print(f"\n{'='*70}")
print("CONSOLIDATED SECURITY REPORT")
print(f"Generated: {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')}")
print(f"{'='*70}")
failed_checks = 0
for r in results:
print(f"\n Account: {r['account_name']} ({r['account_id']})")
if r.get("error"):
print(f" ⚠️ Error: {r['error']}")
continue
for check, val in r.get("checks", {}).items():
if isinstance(val, bool):
icon = "✅" if val else "❌"
if not val:
failed_checks += 1
else:
icon = "ℹ️"
print(f" {icon} {check}: {val}")
print(f"\n{'='*70}")
print(f"Total accounts: {len(results)}")
print(f"Failed checks: {failed_checks}")
# Save full report
with open(output_file, "w") as f:
json.dump(results, f, indent=2, default=str)
print(f"Full report saved to {output_file}")
return results
if __name__ == "__main__":
generate_consolidated_report(
audit_role="SecurityAuditRole",
max_workers=10,
)
Key Commands Explained
| Command | What it does |
|---|
org.get_paginator("list_accounts") | Paginates all accounts in the AWS Organization |
sts.assume_role(RoleArn, RoleSessionName, DurationSeconds) | Returns temporary credentials for the target account |
response["Credentials"] | Contains AccessKeyId, SecretAccessKey, SessionToken, Expiration |
boto3.client("iam", **creds) | Creates an IAM client using the assumed role’s credentials |
iam.get_account_summary()["SummaryMap"]["AccountMFAEnabled"] | 1 = root MFA on, 0 = root MFA off |
ct.describe_trails(includeShadowTrails=False) | Lists CloudTrail trails created in this region |
gd.list_detectors()["DetectorIds"] | Returns GuardDuty detector IDs — empty = not enabled |
ThreadPoolExecutor(max_workers=10) | Runs audits on 10 accounts simultaneously |
as_completed(futures) | Yields results as each thread finishes |
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|
import boto3 | AWS SDK — used for Organizations, STS, IAM, CloudTrail, GuardDuty, Config clients |
import json | Serializes the audit report to a JSON file |
import logging | Structured log output |
from concurrent.futures import ThreadPoolExecutor, as_completed | ThreadPoolExecutor runs audit functions in parallel threads. as_completed yields results as threads finish |
from datetime import datetime | Timestamps for the report header |
get_all_org_accounts()
org = boto3.client("organizations")
paginator = org.get_paginator("list_accounts")
for page in paginator.paginate():
accounts.extend(
acct for acct in page["Accounts"] if acct["Status"] == "ACTIVE"
)
| Line | Explanation |
|---|
boto3.client("organizations") | AWS Organizations client. Must be called from the management (master) account — member accounts cannot call list_accounts() |
get_paginator("list_accounts") | Returns max 20 accounts per page. Pagination is mandatory for organizations with more than 20 accounts |
acct["Status"] == "ACTIVE" | Filter to active accounts only. SUSPENDED accounts cannot have roles assumed in them — trying to assume a role there raises AccessDenied |
assume_role(account_id, role_name)
sts = boto3.client("sts")
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName="SecurityAuditSession",
DurationSeconds=3600,
)
creds = response["Credentials"]
return {
"aws_access_key_id": creds["AccessKeyId"],
"aws_secret_access_key": creds["SecretAccessKey"],
"aws_session_token": creds["SessionToken"],
}
| Line | Explanation |
|---|
boto3.client("sts") | STS (Security Token Service) — exchanges your current credentials for temporary credentials in another account |
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}" | Builds the full role ARN. Every account’s role ARN follows this pattern with the account’s 12-digit ID |
RoleArn=role_arn | The role in the target account to assume. Must have a trust policy allowing your source account’s identity |
RoleSessionName="SecurityAuditSession" | Appears in CloudTrail logs in the TARGET account as the session name — useful for auditing who (which script) made the calls |
DurationSeconds=3600 | Temporary credentials are valid for 1 hour. Max for chained role assumption is 3600 |
response["Credentials"] | Dict with AccessKeyId, SecretAccessKey, SessionToken, Expiration |
Return dict with "aws_access_key_id" etc. | These are the exact keyword arguments that boto3.client() accepts — enables boto3.client("iam", **creds) pattern |
audit_account(account) — 5 Security Checks
iam = boto3.client("iam", **creds)
summary = iam.get_account_summary()["SummaryMap"]
findings["checks"]["root_mfa_enabled"] = bool(summary.get("AccountMFAEnabled", 0))
| Line | Explanation |
|---|
boto3.client("iam", **creds) | Creates an IAM client using the assumed role’s temporary credentials. **creds unpacks the dict as keyword args: aws_access_key_id=..., aws_secret_access_key=..., aws_session_token=... |
get_account_summary() | Returns aggregate IAM statistics for the account. "SummaryMap" is the key containing the counts |
summary.get("AccountMFAEnabled", 0) | Returns 1 if root MFA is enabled, 0 if not. The key may be absent in some older accounts |
bool(...) | Converts 1 → True, 0 → False for clean JSON output |
ct = boto3.client("cloudtrail", **creds)
trails = ct.describe_trails(includeShadowTrails=False)["trailList"]
findings["checks"]["cloudtrail_multi_region"] = any(t.get("IsMultiRegionTrail") for t in trails)
| Line | Explanation |
|---|
boto3.client("cloudtrail", **creds) | CloudTrail client using the assumed role’s credentials — operates in the TARGET account |
describe_trails(includeShadowTrails=False) | Returns trails created in this region. includeShadowTrails=False excludes replicated copies of multi-region trails from other regions (otherwise you’d see the same trail multiple times) |
any(t.get("IsMultiRegionTrail") for t in trails) | Returns True if at least one trail has IsMultiRegionTrail=True. A multi-region trail captures API calls from all regions |
gd = boto3.client("guardduty", **creds)
detectors = gd.list_detectors()["DetectorIds"]
findings["checks"]["guardduty_enabled"] = len(detectors) > 0
| Line | Explanation |
|---|
boto3.client("guardduty", **creds) | GuardDuty client in the target account |
gd.list_detectors()["DetectorIds"] | Returns a list of GuardDuty detector IDs in this region. A detector is the GuardDuty monitoring engine — you need one per region |
len(detectors) > 0 | Empty list = GuardDuty not enabled in this region. Returns True/False |
config = boto3.client("config", **creds)
recorders = config.describe_configuration_recorders()["ConfigurationRecorders"]
findings["checks"]["aws_config_enabled"] = len(recorders) > 0
| Line | Explanation |
|---|
describe_configuration_recorders() | Returns the list of AWS Config recorders. A recorder tracks resource configuration changes |
len(recorders) > 0 | Empty list = Config not enabled. At least one recorder needed for compliance tracking |
try:
policy = iam.get_account_password_policy()["PasswordPolicy"]
findings["checks"]["password_min_length"] = policy.get("MinimumPasswordLength", 0)
except iam.exceptions.NoSuchEntityException:
findings["checks"]["password_policy"] = "NOT_CONFIGURED"
| Line | Explanation |
|---|
get_account_password_policy() | Raises NoSuchEntityException if no password policy is configured (the default AWS state). This exception handling is necessary |
policy.get("MinimumPasswordLength", 0) | CIS Benchmark requires at least 14 characters. Default is 0 when no policy exists |
generate_consolidated_report(audit_role, max_workers, output_file)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(audit_account, acct): acct
for acct in accounts
}
for future in as_completed(futures):
result = future.result()
results.append(result)
| Line | Explanation |
|---|
ThreadPoolExecutor(max_workers=10) | Creates a thread pool with 10 worker threads. Up to 10 audit_account() calls run simultaneously — reduces total time from N × 10s to roughly ⌈N/10⌉ × 10s |
executor.submit(audit_account, acct) | Submits audit_account(acct) to run in a worker thread. Returns a Future object immediately (non-blocking) |
futures = {future: acct for acct in accounts} | Dict comprehension creating a mapping from Future → account. Used to identify which account each future belongs to |
as_completed(futures) | Yields each Future as it completes (not in submission order). Allows processing results as they arrive rather than waiting for all to finish |
future.result() | Blocks until this specific future completes, then returns the return value of audit_account(). Any exception raised inside the thread is re-raised here |
json.dump(results, f, indent=2, default=str)
| Line | Explanation |
|---|
json.dump(results, f, indent=2) | Writes the results list to the JSON file with 2-space indentation |
default=str | json.dump doesn’t know how to serialize datetime objects by default. default=str converts any non-serializable object (datetime, Decimal) to its string representation |