Scenario Advanced Python Python AWS Scripting

Generate Least-Privilege IAM Policy from CloudTrail Logs

Python script that analyses CloudTrail logs for a given IAM role and generates a minimal IAM policy containing only the actions the role actually used.

January 20, 2025 8 min read ~25 min to complete DB
The Situation

IAM permission hygiene — replace broad AdministratorAccess or PowerUserAccess policies with tightly-scoped policies derived from real usage data.

6 Steps
3 Services Used
~25 min Duration
Advanced Difficulty

Problem Statement

Your application role has PowerUserAccess (5,000+ allowed actions), but in 30 days of production usage it only calls 12 distinct APIs. A compromise of this role exposes your entire account. This script reads CloudTrail and generates a policy with just those 12 actions — reducing blast radius by 99%.


How It Works

CloudTrail logs → filter by role name → extract (service, action) pairs
     → group by service → build IAM Statement per service
     → output minimal policy JSON

Complete Script

import boto3
import json
from collections import defaultdict
from datetime import datetime, timedelta


def generate_least_privilege_policy(
    role_name: str,
    lookback_days: int = 30,
    output_file: str = "least_privilege_policy.json",
) -> dict:
    """
    Analyses CloudTrail events for a specific IAM role and generates
    a least-privilege IAM policy based on actual API usage.

    lookback_days: how far back to look in CloudTrail (max 90 days
    for lookup_events — for longer history use Athena on S3 logs).
    """
    cloudtrail = boto3.client("cloudtrail")

    end_time   = datetime.utcnow()
    start_time = end_time - timedelta(days=lookback_days)

    # actions_by_service: {"s3": {"s3:GetObject", "s3:PutObject"}, ...}
    actions_by_service: dict[str, set] = defaultdict(set)
    # resources_used: {"s3": {"arn:aws:s3:::my-bucket"}, ...}
    resources_used: dict[str, set]     = defaultdict(set)

    print(f"Analysing CloudTrail for role: {role_name}")
    print(f"Period: {start_time.date()}{end_time.date()}")
    print("Scanning events...\n")

    event_count = 0

    # ── Paginate CloudTrail events ─────────────────────────────────
    # lookup_events() supports LookupAttributes to filter events.
    # AttributeKey "Username" matches the session name for assumed roles,
    # which is the role name when assumed via AWS services.
    # For cross-account role assumption, filter by the full ARN instead.
    #
    # get_paginator handles the NextToken loop automatically.
    # Each page["Events"] contains up to 50 events.
    paginator = cloudtrail.get_paginator("lookup_events")
    for page in paginator.paginate(
        LookupAttributes=[{
            "AttributeKey":   "Username",
            "AttributeValue": role_name,
        }],
        StartTime=start_time,
        EndTime=end_time,
    ):
        for event in page["Events"]:
            event_name   = event.get("EventName", "")
            event_source = event.get("EventSource", "")

            if not (event_name and event_source):
                continue

            # event_source is like "s3.amazonaws.com" → strip domain → "s3"
            service = event_source.replace(".amazonaws.com", "")
            action  = f"{service}:{event_name}"

            actions_by_service[service].add(action)
            event_count += 1

            # ── Try to extract resource ARNs from the raw event ───
            # CloudTrailEvent is a JSON string embedded in the event dict.
            # requestParameters varies by API call — we check for common
            # resource identifiers: bucketName, instanceId, functionName, etc.
            try:
                detail = json.loads(event.get("CloudTrailEvent", "{}"))
                params = detail.get("requestParameters") or {}

                # Map known parameter names to ARN patterns
                arn_extractors = {
                    "bucketName":    lambda p: f"arn:aws:s3:::{p['bucketName']}",
                    "instanceId":    lambda p: f"arn:aws:ec2:*:*:instance/{p['instanceId']}",
                    "functionName":  lambda p: f"arn:aws:lambda:*:*:function:{p['functionName']}",
                    "secretId":      lambda p: f"arn:aws:secretsmanager:*:*:secret:{p['secretId']}",
                    "queueUrl":      lambda p: p["queueUrl"],   # SQS URL works as resource
                    "topicArn":      lambda p: p["topicArn"],
                }

                for key, extractor in arn_extractors.items():
                    if key in params:
                        try:
                            resources_used[service].add(extractor(params))
                        except Exception:
                            pass
            except Exception:
                pass   # Malformed event — skip gracefully

    print(f"Processed {event_count} CloudTrail events")
    print(f"Services accessed: {', '.join(sorted(actions_by_service.keys()))}\n")

    # ── Build IAM Policy ──────────────────────────────────────────
    policy_statements = []

    for service, actions in sorted(actions_by_service.items()):
        # Use specific resource ARNs if we found them; otherwise default to "*"
        resources = sorted(resources_used.get(service, {"*"}))

        statement = {
            "Sid":      f"Allow{service.replace('-', '').title()}Actions",
            "Effect":   "Allow",
            "Action":   sorted(list(actions)),
            "Resource": resources,
        }
        policy_statements.append(statement)

        print(f"  {service}: {len(actions)} action(s), {len(resources)} resource(s)")
        for action in sorted(actions):
            print(f"    + {action}")

    policy = {
        "Version":   "2012-10-17",
        "Statement": policy_statements,
    }

    # ── Save to file ──────────────────────────────────────────────
    with open(output_file, "w") as f:
        json.dump(policy, f, indent=2)
    print(f"\nPolicy saved to: {output_file}")

    total_actions = sum(len(a) for a in actions_by_service.values())
    print(f"Total unique API actions: {total_actions}")

    return policy


def apply_policy_to_role(role_name: str, policy: dict, policy_name: str = None) -> str:
    """
    Create a new IAM managed policy from the generated policy document
    and attach it to the role.

    create_policy() takes a JSON string (not dict) for PolicyDocument.
    The resulting ARN can then be attached to the role.
    """
    iam = boto3.client("iam")
    policy_name = policy_name or f"least-privilege-{role_name}"

    response = iam.create_policy(
        PolicyName=policy_name,
        PolicyDocument=json.dumps(policy),
        Description=(
            f"Auto-generated least-privilege policy for role {role_name} "
            f"based on CloudTrail usage analysis."
        ),
        Tags=[
            {"Key": "GeneratedBy", "Value": "LeastPrivilegeTool"},
            {"Key": "SourceRole",  "Value": role_name},
        ],
    )
    policy_arn = response["Policy"]["Arn"]
    print(f"Created policy: {policy_arn}")

    # Attach to the role
    iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
    print(f"Attached to role: {role_name}")

    return policy_arn


if __name__ == "__main__":
    # Step 1: Analyse usage and generate policy
    policy = generate_least_privilege_policy(
        role_name="my-app-role",
        lookback_days=30,
        output_file="least_privilege_policy.json",
    )

    print("\nGenerated Policy Preview:")
    print(json.dumps(policy, indent=2))

    # Step 2: Review the policy file manually, then apply:
    # apply_policy_to_role("my-app-role", policy)

Sample Output

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "AllowS3Actions",
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::my-app-bucket"
      ]
    },
    {
      "Sid": "AllowSecretsmanagerActions",
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue"
      ],
      "Resource": [
        "arn:aws:secretsmanager:*:*:secret:my-app-db-creds"
      ]
    },
    {
      "Sid": "AllowCloudwatchActions",
      "Effect": "Allow",
      "Action": [
        "cloudwatch:PutMetricData"
      ],
      "Resource": ["*"]
    }
  ]
}

Key Commands Explained

CommandWhat it does
get_paginator("lookup_events")Paginates CloudTrail events — max 90-day window
LookupAttributes[AttributeKey="Username"]Filters events by IAM principal / session name
event["EventSource"]The AWS service that received the API call (e.g., s3.amazonaws.com)
event["EventName"]The API action called (e.g., GetObject)
event["CloudTrailEvent"]Full raw event JSON — contains requestParameters, responseElements, etc.
json.loads(event["CloudTrailEvent"])Parses the embedded JSON string into a dict
defaultdict(set)Auto-initialises a set for each new key — no setdefault needed
create_policy(PolicyDocument=json.dumps(policy))Creates a managed IAM policy from JSON string
attach_role_policy(RoleName, PolicyArn)Attaches the managed policy to the role

Limitations & Tips

CloudTrail lookup_events limit: The API returns a maximum of 90 days of data. For longer analysis windows, query CloudTrail S3 logs with Amazon Athena.

Data plane events not logged by default: S3 object-level events (GetObject, PutObject) require enabling CloudTrail Data Events. Check your trail settings.

Always review before applying: The generated policy is a starting point. Some actions may be needed only on rare occasions not captured in the analysis window. Add a buffer period or combine with IAM Access Analyzer.

Use IAM Access Analyzer: AWS’s built-in tool does similar analysis — this script gives you programmatic control and integration into your CI/CD workflow.


🔍 Line-by-Line Code Walkthrough

Imports

LineWhy It’s Used
import boto3AWS SDK for CloudTrail and IAM API calls
import jsonUsed to parse CloudTrailEvent (a JSON string inside the event dict) and to serialize the final policy document
from collections import defaultdictCreates dictionaries that auto-initialize missing keys. defaultdict(set) gives an empty set when a key is first accessed — no need for setdefault()
from datetime import datetime, timedeltaCompute the CloudTrail lookback window: datetime.utcnow() - timedelta(days=30)

generate_least_privilege_policy(role_name, lookback_days, output_file)

cloudtrail = boto3.client("cloudtrail")
end_time   = datetime.utcnow()
start_time = end_time - timedelta(days=lookback_days)
LineExplanation
boto3.client("cloudtrail")CloudTrail is a global service — you can query it without specifying a region. Events from all regions are included if you have a multi-region trail
datetime.utcnow()Current UTC time as a naive datetime. CloudTrail’s StartTime/EndTime accept naive UTC datetimes
timedelta(days=lookback_days)Creates a duration. Subtracting 30 days from now gives the start of the analysis window
actions_by_service: dict[str, set] = defaultdict(set)
resources_used: dict[str, set]     = defaultdict(set)
LineExplanation
defaultdict(set)A dict that creates a new empty set for any key not yet seen. Without this, the first actions_by_service["s3"].add(...) would raise KeyError
actions_by_serviceWill hold: {"s3": {"s3:GetObject", "s3:PutObject"}, "ec2": {"ec2:DescribeInstances"}, ...}
resources_usedWill hold resource ARNs per service: {"s3": {"arn:aws:s3:::my-bucket"}, ...} — used for tighter Resource clauses in the policy
paginator = cloudtrail.get_paginator("lookup_events")
for page in paginator.paginate(
    LookupAttributes=[{"AttributeKey": "Username", "AttributeValue": role_name}],
    StartTime=start_time,
    EndTime=end_time,
):
LineExplanation
get_paginator("lookup_events")lookup_events returns up to 50 events per page. A busy role may have thousands of events — pagination is mandatory
LookupAttributes=[{"AttributeKey": "Username", ...}]Filters CloudTrail events to only those made by this specific IAM principal. AttributeKey: "Username" matches the session name for assumed roles
StartTime=start_time, EndTime=end_timeThe time window. CloudTrail lookup_events supports a maximum 90-day lookback window
event_name   = event.get("EventName", "")
event_source = event.get("EventSource", "")
service = event_source.replace(".amazonaws.com", "")
action  = f"{service}:{event_name}"
actions_by_service[service].add(action)
LineExplanation
event.get("EventName", "")API method that was called (e.g., "GetObject", "DescribeInstances"). Empty string means the event has no name — we skip it
event.get("EventSource", "")AWS service endpoint that processed the call (e.g., "s3.amazonaws.com", "ec2.amazonaws.com")
.replace(".amazonaws.com", "")Strips the domain suffix to get the service prefix used in IAM actions: "s3", "ec2", "lambda"
f"{service}:{event_name}"Formats the IAM action string: "s3:GetObject", "ec2:DescribeInstances"
actions_by_service[service].add(action)Adds the action to the set for this service. Sets deduplicate — calling the same API 1000 times still records it once
detail = json.loads(event.get("CloudTrailEvent", "{}"))
params = detail.get("requestParameters") or {}
LineExplanation
event.get("CloudTrailEvent", "{}")The full raw event as a JSON-encoded string. We default to "{}" so json.loads doesn’t fail on events without this field
json.loads(...)Parses the JSON string into a Python dict containing requestParameters, responseElements, userIdentity, etc.
detail.get("requestParameters") or {}requestParameters can be None (for read-only events) or a dict of API input parameters. The or {} handles None — you can’t iterate over None
arn_extractors = {
    "bucketName":   lambda p: f"arn:aws:s3:::{p['bucketName']}",
    "instanceId":   lambda p: f"arn:aws:ec2:*:*:instance/{p['instanceId']}",
    "functionName": lambda p: f"arn:aws:lambda:*:*:function:{p['functionName']}",
}
for key, extractor in arn_extractors.items():
    if key in params:
        resources_used[service].add(extractor(params))
LineExplanation
arn_extractorsA dict mapping known request parameter names to lambda functions that build ARN strings
lambda p: f"arn:aws:s3:::{p['bucketName']}"An inline function. When called with params, it builds the S3 bucket ARN
if key in params: extractor(params)Only runs the ARN builder if the relevant parameter exists in this event
resources_used[service].add(...)Adds the extracted ARN to the set for this service — used later for the Resource field in the IAM policy statement

Building the IAM Policy

for service, actions in sorted(actions_by_service.items()):
    resources = sorted(resources_used.get(service, {"*"}))
    statement = {
        "Sid":      f"Allow{service.replace('-', '').title()}Actions",
        "Effect":   "Allow",
        "Action":   sorted(list(actions)),
        "Resource": resources,
    }
    policy_statements.append(statement)
LineExplanation
sorted(actions_by_service.items())Sort by service name so the output policy is alphabetically ordered — easier for humans to review
resources_used.get(service, {"*"})If we extracted specific resource ARNs, use them. If not (no parameter match), default to "*" (any resource)
sorted(resources_used.get(..., {"*"}))Sort the resource list for deterministic, diff-able output
f"Allow{service.replace('-', '').title()}Actions"Generates a Sid like "AllowS3Actions" or "AllowSecretsmanagerActions". replace('-', '') removes hyphens, .title() capitalizes first letter of each word
"Action": sorted(list(actions))Converts the set to a sorted list. IAM requires Action to be a string or list of strings

apply_policy_to_role(role_name, policy, policy_name)

response = iam.create_policy(
    PolicyName=policy_name,
    PolicyDocument=json.dumps(policy),
    Description="Auto-generated least-privilege policy...",
)
policy_arn = response["Policy"]["Arn"]
iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
LineExplanation
create_policy(PolicyName=..., PolicyDocument=json.dumps(policy))Creates a managed IAM policy. PolicyDocument must be a JSON string — not a dict. json.dumps() converts our dict
response["Policy"]["Arn"]The ARN of the newly-created policy (e.g., arn:aws:iam::123456789012:policy/least-privilege-my-app-role)
attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)Attaches the managed policy to the role. The role can now only perform the actions observed in CloudTrail
Services Used
CloudTrailIAMboto3
Prerequisites
  • Python 3.8+
  • boto3
  • CloudTrail enabled with multi-region trail
  • IAM: cloudtrail:LookupEvents, iam:GetRole, iam:CreatePolicy
What You Learned
  • CloudTrail lookup_events paginator
  • Action grouping by service
  • IAM policy JSON structure
  • Resource ARN extraction from event details

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios