Generate Least-Privilege IAM Policy from CloudTrail Logs
Python script that analyses CloudTrail logs for a given IAM role and generates a minimal IAM policy containing only the actions the role actually used.
IAM permission hygiene — replace broad AdministratorAccess or PowerUserAccess policies with tightly-scoped policies derived from real usage data.
Problem Statement
Your application role has PowerUserAccess (5,000+ allowed actions), but in 30 days of production usage it only calls 12 distinct APIs. A compromise of this role exposes your entire account. This script reads CloudTrail and generates a policy with just those 12 actions — reducing blast radius by 99%.
How It Works
CloudTrail logs → filter by role name → extract (service, action) pairs
→ group by service → build IAM Statement per service
→ output minimal policy JSON
Complete Script
import boto3
import json
from collections import defaultdict
from datetime import datetime, timedelta
def generate_least_privilege_policy(
role_name: str,
lookback_days: int = 30,
output_file: str = "least_privilege_policy.json",
) -> dict:
"""
Analyses CloudTrail events for a specific IAM role and generates
a least-privilege IAM policy based on actual API usage.
lookback_days: how far back to look in CloudTrail (max 90 days
for lookup_events — for longer history use Athena on S3 logs).
"""
cloudtrail = boto3.client("cloudtrail")
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=lookback_days)
# actions_by_service: {"s3": {"s3:GetObject", "s3:PutObject"}, ...}
actions_by_service: dict[str, set] = defaultdict(set)
# resources_used: {"s3": {"arn:aws:s3:::my-bucket"}, ...}
resources_used: dict[str, set] = defaultdict(set)
print(f"Analysing CloudTrail for role: {role_name}")
print(f"Period: {start_time.date()} → {end_time.date()}")
print("Scanning events...\n")
event_count = 0
# ── Paginate CloudTrail events ─────────────────────────────────
# lookup_events() supports LookupAttributes to filter events.
# AttributeKey "Username" matches the session name for assumed roles,
# which is the role name when assumed via AWS services.
# For cross-account role assumption, filter by the full ARN instead.
#
# get_paginator handles the NextToken loop automatically.
# Each page["Events"] contains up to 50 events.
paginator = cloudtrail.get_paginator("lookup_events")
for page in paginator.paginate(
LookupAttributes=[{
"AttributeKey": "Username",
"AttributeValue": role_name,
}],
StartTime=start_time,
EndTime=end_time,
):
for event in page["Events"]:
event_name = event.get("EventName", "")
event_source = event.get("EventSource", "")
if not (event_name and event_source):
continue
# event_source is like "s3.amazonaws.com" → strip domain → "s3"
service = event_source.replace(".amazonaws.com", "")
action = f"{service}:{event_name}"
actions_by_service[service].add(action)
event_count += 1
# ── Try to extract resource ARNs from the raw event ───
# CloudTrailEvent is a JSON string embedded in the event dict.
# requestParameters varies by API call — we check for common
# resource identifiers: bucketName, instanceId, functionName, etc.
try:
detail = json.loads(event.get("CloudTrailEvent", "{}"))
params = detail.get("requestParameters") or {}
# Map known parameter names to ARN patterns
arn_extractors = {
"bucketName": lambda p: f"arn:aws:s3:::{p['bucketName']}",
"instanceId": lambda p: f"arn:aws:ec2:*:*:instance/{p['instanceId']}",
"functionName": lambda p: f"arn:aws:lambda:*:*:function:{p['functionName']}",
"secretId": lambda p: f"arn:aws:secretsmanager:*:*:secret:{p['secretId']}",
"queueUrl": lambda p: p["queueUrl"], # SQS URL works as resource
"topicArn": lambda p: p["topicArn"],
}
for key, extractor in arn_extractors.items():
if key in params:
try:
resources_used[service].add(extractor(params))
except Exception:
pass
except Exception:
pass # Malformed event — skip gracefully
print(f"Processed {event_count} CloudTrail events")
print(f"Services accessed: {', '.join(sorted(actions_by_service.keys()))}\n")
# ── Build IAM Policy ──────────────────────────────────────────
policy_statements = []
for service, actions in sorted(actions_by_service.items()):
# Use specific resource ARNs if we found them; otherwise default to "*"
resources = sorted(resources_used.get(service, {"*"}))
statement = {
"Sid": f"Allow{service.replace('-', '').title()}Actions",
"Effect": "Allow",
"Action": sorted(list(actions)),
"Resource": resources,
}
policy_statements.append(statement)
print(f" {service}: {len(actions)} action(s), {len(resources)} resource(s)")
for action in sorted(actions):
print(f" + {action}")
policy = {
"Version": "2012-10-17",
"Statement": policy_statements,
}
# ── Save to file ──────────────────────────────────────────────
with open(output_file, "w") as f:
json.dump(policy, f, indent=2)
print(f"\nPolicy saved to: {output_file}")
total_actions = sum(len(a) for a in actions_by_service.values())
print(f"Total unique API actions: {total_actions}")
return policy
def apply_policy_to_role(role_name: str, policy: dict, policy_name: str = None) -> str:
"""
Create a new IAM managed policy from the generated policy document
and attach it to the role.
create_policy() takes a JSON string (not dict) for PolicyDocument.
The resulting ARN can then be attached to the role.
"""
iam = boto3.client("iam")
policy_name = policy_name or f"least-privilege-{role_name}"
response = iam.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy),
Description=(
f"Auto-generated least-privilege policy for role {role_name} "
f"based on CloudTrail usage analysis."
),
Tags=[
{"Key": "GeneratedBy", "Value": "LeastPrivilegeTool"},
{"Key": "SourceRole", "Value": role_name},
],
)
policy_arn = response["Policy"]["Arn"]
print(f"Created policy: {policy_arn}")
# Attach to the role
iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
print(f"Attached to role: {role_name}")
return policy_arn
if __name__ == "__main__":
# Step 1: Analyse usage and generate policy
policy = generate_least_privilege_policy(
role_name="my-app-role",
lookback_days=30,
output_file="least_privilege_policy.json",
)
print("\nGenerated Policy Preview:")
print(json.dumps(policy, indent=2))
# Step 2: Review the policy file manually, then apply:
# apply_policy_to_role("my-app-role", policy)
Sample Output
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowS3Actions",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::my-app-bucket"
]
},
{
"Sid": "AllowSecretsmanagerActions",
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": [
"arn:aws:secretsmanager:*:*:secret:my-app-db-creds"
]
},
{
"Sid": "AllowCloudwatchActions",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": ["*"]
}
]
}
Key Commands Explained
| Command | What it does |
|---|---|
get_paginator("lookup_events") | Paginates CloudTrail events — max 90-day window |
LookupAttributes[AttributeKey="Username"] | Filters events by IAM principal / session name |
event["EventSource"] | The AWS service that received the API call (e.g., s3.amazonaws.com) |
event["EventName"] | The API action called (e.g., GetObject) |
event["CloudTrailEvent"] | Full raw event JSON — contains requestParameters, responseElements, etc. |
json.loads(event["CloudTrailEvent"]) | Parses the embedded JSON string into a dict |
defaultdict(set) | Auto-initialises a set for each new key — no setdefault needed |
create_policy(PolicyDocument=json.dumps(policy)) | Creates a managed IAM policy from JSON string |
attach_role_policy(RoleName, PolicyArn) | Attaches the managed policy to the role |
Limitations & Tips
CloudTrail lookup_events limit: The API returns a maximum of 90 days of data. For longer analysis windows, query CloudTrail S3 logs with Amazon Athena.
Data plane events not logged by default: S3 object-level events (GetObject, PutObject) require enabling CloudTrail Data Events. Check your trail settings.
Always review before applying: The generated policy is a starting point. Some actions may be needed only on rare occasions not captured in the analysis window. Add a buffer period or combine with IAM Access Analyzer.
Use IAM Access Analyzer: AWS’s built-in tool does similar analysis — this script gives you programmatic control and integration into your CI/CD workflow.
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|---|
import boto3 | AWS SDK for CloudTrail and IAM API calls |
import json | Used to parse CloudTrailEvent (a JSON string inside the event dict) and to serialize the final policy document |
from collections import defaultdict | Creates dictionaries that auto-initialize missing keys. defaultdict(set) gives an empty set when a key is first accessed — no need for setdefault() |
from datetime import datetime, timedelta | Compute the CloudTrail lookback window: datetime.utcnow() - timedelta(days=30) |
generate_least_privilege_policy(role_name, lookback_days, output_file)
cloudtrail = boto3.client("cloudtrail")
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=lookback_days)
| Line | Explanation |
|---|---|
boto3.client("cloudtrail") | CloudTrail is a global service — you can query it without specifying a region. Events from all regions are included if you have a multi-region trail |
datetime.utcnow() | Current UTC time as a naive datetime. CloudTrail’s StartTime/EndTime accept naive UTC datetimes |
timedelta(days=lookback_days) | Creates a duration. Subtracting 30 days from now gives the start of the analysis window |
actions_by_service: dict[str, set] = defaultdict(set)
resources_used: dict[str, set] = defaultdict(set)
| Line | Explanation |
|---|---|
defaultdict(set) | A dict that creates a new empty set for any key not yet seen. Without this, the first actions_by_service["s3"].add(...) would raise KeyError |
actions_by_service | Will hold: {"s3": {"s3:GetObject", "s3:PutObject"}, "ec2": {"ec2:DescribeInstances"}, ...} |
resources_used | Will hold resource ARNs per service: {"s3": {"arn:aws:s3:::my-bucket"}, ...} — used for tighter Resource clauses in the policy |
paginator = cloudtrail.get_paginator("lookup_events")
for page in paginator.paginate(
LookupAttributes=[{"AttributeKey": "Username", "AttributeValue": role_name}],
StartTime=start_time,
EndTime=end_time,
):
| Line | Explanation |
|---|---|
get_paginator("lookup_events") | lookup_events returns up to 50 events per page. A busy role may have thousands of events — pagination is mandatory |
LookupAttributes=[{"AttributeKey": "Username", ...}] | Filters CloudTrail events to only those made by this specific IAM principal. AttributeKey: "Username" matches the session name for assumed roles |
StartTime=start_time, EndTime=end_time | The time window. CloudTrail lookup_events supports a maximum 90-day lookback window |
event_name = event.get("EventName", "")
event_source = event.get("EventSource", "")
service = event_source.replace(".amazonaws.com", "")
action = f"{service}:{event_name}"
actions_by_service[service].add(action)
| Line | Explanation |
|---|---|
event.get("EventName", "") | API method that was called (e.g., "GetObject", "DescribeInstances"). Empty string means the event has no name — we skip it |
event.get("EventSource", "") | AWS service endpoint that processed the call (e.g., "s3.amazonaws.com", "ec2.amazonaws.com") |
.replace(".amazonaws.com", "") | Strips the domain suffix to get the service prefix used in IAM actions: "s3", "ec2", "lambda" |
f"{service}:{event_name}" | Formats the IAM action string: "s3:GetObject", "ec2:DescribeInstances" |
actions_by_service[service].add(action) | Adds the action to the set for this service. Sets deduplicate — calling the same API 1000 times still records it once |
detail = json.loads(event.get("CloudTrailEvent", "{}"))
params = detail.get("requestParameters") or {}
| Line | Explanation |
|---|---|
event.get("CloudTrailEvent", "{}") | The full raw event as a JSON-encoded string. We default to "{}" so json.loads doesn’t fail on events without this field |
json.loads(...) | Parses the JSON string into a Python dict containing requestParameters, responseElements, userIdentity, etc. |
detail.get("requestParameters") or {} | requestParameters can be None (for read-only events) or a dict of API input parameters. The or {} handles None — you can’t iterate over None |
arn_extractors = {
"bucketName": lambda p: f"arn:aws:s3:::{p['bucketName']}",
"instanceId": lambda p: f"arn:aws:ec2:*:*:instance/{p['instanceId']}",
"functionName": lambda p: f"arn:aws:lambda:*:*:function:{p['functionName']}",
}
for key, extractor in arn_extractors.items():
if key in params:
resources_used[service].add(extractor(params))
| Line | Explanation |
|---|---|
arn_extractors | A dict mapping known request parameter names to lambda functions that build ARN strings |
lambda p: f"arn:aws:s3:::{p['bucketName']}" | An inline function. When called with params, it builds the S3 bucket ARN |
if key in params: extractor(params) | Only runs the ARN builder if the relevant parameter exists in this event |
resources_used[service].add(...) | Adds the extracted ARN to the set for this service — used later for the Resource field in the IAM policy statement |
Building the IAM Policy
for service, actions in sorted(actions_by_service.items()):
resources = sorted(resources_used.get(service, {"*"}))
statement = {
"Sid": f"Allow{service.replace('-', '').title()}Actions",
"Effect": "Allow",
"Action": sorted(list(actions)),
"Resource": resources,
}
policy_statements.append(statement)
| Line | Explanation |
|---|---|
sorted(actions_by_service.items()) | Sort by service name so the output policy is alphabetically ordered — easier for humans to review |
resources_used.get(service, {"*"}) | If we extracted specific resource ARNs, use them. If not (no parameter match), default to "*" (any resource) |
sorted(resources_used.get(..., {"*"})) | Sort the resource list for deterministic, diff-able output |
f"Allow{service.replace('-', '').title()}Actions" | Generates a Sid like "AllowS3Actions" or "AllowSecretsmanagerActions". replace('-', '') removes hyphens, .title() capitalizes first letter of each word |
"Action": sorted(list(actions)) | Converts the set to a sorted list. IAM requires Action to be a string or list of strings |
apply_policy_to_role(role_name, policy, policy_name)
response = iam.create_policy(
PolicyName=policy_name,
PolicyDocument=json.dumps(policy),
Description="Auto-generated least-privilege policy...",
)
policy_arn = response["Policy"]["Arn"]
iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
| Line | Explanation |
|---|---|
create_policy(PolicyName=..., PolicyDocument=json.dumps(policy)) | Creates a managed IAM policy. PolicyDocument must be a JSON string — not a dict. json.dumps() converts our dict |
response["Policy"]["Arn"] | The ARN of the newly-created policy (e.g., arn:aws:iam::123456789012:policy/least-privilege-my-app-role) |
attach_role_policy(RoleName=role_name, PolicyArn=policy_arn) | Attaches the managed policy to the role. The role can now only perform the actions observed in CloudTrail |
- CloudTrail lookup_events paginator
- Action grouping by service
- IAM policy JSON structure
- Resource ARN extraction from event details
Have a similar scenario to share?
Production incidents are the best teachers. Submit your real-world scenario and help others learn.
Open Google FormRelated Scenarios
CloudTrail Root Account Monitor — Alert on Root API Usage via Lambda
Problem Statement The AWS root account bypasses all IAM policies and has unrestricted access to everything in the account — including …
IAM Access Key Rotation — Auto-Rotate Keys Older than 90 Days
Problem Statement CIS AWS Benchmark 1.14 requires IAM access keys to be rotated every 90 days. Long-lived keys are a top attack vector — if …
Developer Pushed AWS Credentials to Public GitHub — Incident Response
The Problem Leaked AWS credentials are one of the most dangerous security incidents. Attackers have automated scrapers that monitor GitHub …