The Situation
Security monitoring — root account usage is a high-severity security event. The root account has unrestricted access and no IAM policies apply to it.
Problem Statement
The AWS root account bypasses all IAM policies and has unrestricted access to everything in the account — including billing, support plans, and account closure. Any API call made by root should be treated as a potential security incident (unless it’s a known, scheduled administrative task).
Architecture
CloudTrail (all API calls)
↓
EventBridge rule:
event.detail.userIdentity.type = "Root"
↓
Lambda (this script)
↓
SNS → PagerDuty / Slack / Email
EventBridge Rule Setup
# Create EventBridge rule to detect root account usage
aws events put-rule \
--name detect-root-account-usage \
--event-pattern '{
"source": ["aws.cloudtrail"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"userIdentity": {
"type": ["Root"]
}
}
}' \
--state ENABLED
# Add Lambda as the target
aws events put-targets \
--rule detect-root-account-usage \
--targets "Id=RootMonitorLambda,Arn=arn:aws:lambda:us-east-1:123456789012:function:RootAccountMonitor"
Complete Lambda Script
import boto3
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Known root account actions that are acceptable (e.g., automated billing scripts)
# Add your authorized IPs or event names here
AUTHORIZED_ROOT_EVENTS = {
"GetBillingData", # Billing automation
"ListBillingReports",
}
AUTHORIZED_SOURCE_IPS = {
# Add your corporate IPs here
# "203.0.113.10",
}
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:security-critical-alerts"
def lambda_handler(event: dict, context) -> dict:
"""
EventBridge delivers events in this structure when triggered
by a CloudTrail API call:
event = {
"version": "0",
"id": "...",
"source": "aws.cloudtrail",
"detail-type": "AWS API Call via CloudTrail",
"detail": {
"eventVersion": "1.08",
"userIdentity": {
"type": "Root",
"principalId": "123456789012",
"arn": "arn:aws:iam::123456789012:root",
"accountId": "123456789012"
},
"eventTime": "2025-01-20T14:30:00Z",
"eventName": "ConsoleLogin",
"eventSource": "signin.amazonaws.com",
"sourceIPAddress": "203.0.113.5",
"userAgent": "Mozilla/5.0...",
"awsRegion": "us-east-1",
"requestParameters": null,
"responseElements": {"ConsoleLogin": "Success"}
}
}
event["detail"] contains the raw CloudTrail event fields.
"""
sns = boto3.client("sns")
detail = event.get("detail", {})
# ── Extract key fields ─────────────────────────────────────────
event_name = detail.get("eventName", "Unknown")
event_source = detail.get("eventSource", "Unknown")
source_ip = detail.get("sourceIPAddress","Unknown")
user_agent = detail.get("userAgent", "Unknown")
event_time = detail.get("eventTime", "Unknown")
aws_region = detail.get("awsRegion", "Unknown")
account_id = detail.get("userIdentity", {}).get("accountId", "Unknown")
# ── Filter: skip authorized root actions ──────────────────────
if event_name in AUTHORIZED_ROOT_EVENTS:
logger.info(f"Root API call {event_name} is in the authorized list — skipping alert")
return {"statusCode": 200, "message": "Authorized root event — skipped"}
if source_ip in AUTHORIZED_SOURCE_IPS:
logger.info(f"Root API call from authorized IP {source_ip} — skipping alert")
return {"statusCode": 200, "message": "Authorized IP — skipped"}
# ── Log the incident ──────────────────────────────────────────
# Using CRITICAL level ensures this appears in CloudWatch Logs
# with severity that can trigger a separate metric filter alarm
logger.critical(
f"ROOT_ACCOUNT_USAGE | {event_name} | {source_ip} | {event_time} | {aws_region}"
)
# ── Build alert message ───────────────────────────────────────
alert_message = f"""
🚨 ROOT ACCOUNT ACTIVITY DETECTED 🚨
======================================
Account ID: {account_id}
Event: {event_name}
Service: {event_source}
Region: {aws_region}
Time: {event_time}
Source IP: {source_ip}
User Agent: {user_agent}
IMMEDIATE ACTIONS REQUIRED:
1. Verify if this action was authorized
2. If unauthorized — rotate root credentials NOW:
a. Change root password
b. Regenerate root MFA device
c. Check for new IAM users/roles created by root
3. Review CloudTrail for all root activity in the last 24h:
aws cloudtrail lookup-events \\
--lookup-attributes AttributeKey=Username,AttributeValue=root \\
--start-time $(date -d '24 hours ago' --utc +%Y-%m-%dT%H:%M:%SZ)
4. Escalate to Security team immediately
Full CloudTrail Event:
{json.dumps(detail, indent=2, default=str)}
"""
# ── Publish to SNS ────────────────────────────────────────────
# publish() sends the message to all SNS subscribers.
# MessageAttributes let subscribers filter on severity.
# Subject must be ≤ 100 characters for email delivery.
#
# MessageAttributes with DataType="String" allow SNS filter policies:
# subscribers can opt to receive only CRITICAL messages.
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Subject=f"🚨 CRITICAL: Root Account Activity in {aws_region} — {event_name}",
Message=alert_message,
MessageAttributes={
"severity": {
"DataType": "String",
"StringValue": "CRITICAL",
},
"service": {
"DataType": "String",
"StringValue": "root-monitor",
},
"event_name": {
"DataType": "String",
"StringValue": event_name,
},
},
)
logger.info(f"Alert published to SNS for root event: {event_name} from {source_ip}")
return {
"statusCode": 200,
"message": "Root account alert published",
"event_name": event_name,
"source_ip": source_ip,
}
Automated Incident Response (Optional Extension)
def create_incident_snapshot(detail: dict) -> None:
"""
When root activity is detected, automatically capture a snapshot of
current IAM state for forensic analysis.
"""
iam = boto3.client("iam")
s3 = boto3.client("s3")
# Get current IAM account summary
summary = iam.get_account_summary()["SummaryMap"]
# List recently created IAM users (last 30 minutes)
from datetime import datetime, timezone, timedelta
users = []
paginator = iam.get_paginator("list_users")
threshold = datetime.now(timezone.utc) - timedelta(minutes=30)
for page in paginator.paginate():
for user in page["Users"]:
if user["CreateDate"] > threshold:
users.append(user["UserName"])
incident_data = {
"timestamp": detail.get("eventTime"),
"event": detail.get("eventName"),
"source_ip": detail.get("sourceIPAddress"),
"recent_new_users": users,
"iam_summary": {k: v for k, v in summary.items() if "Users" in k or "Roles" in k},
}
# Store forensic data in S3 for later analysis
s3.put_object(
Bucket="your-security-audit-bucket",
Key=f"incidents/root-activity/{detail.get('eventTime', 'unknown')}.json",
Body=json.dumps(incident_data, indent=2),
ServerSideEncryption="aws:kms",
)
Key Commands Explained
| Command | What it does |
|---|
event["detail"] | The raw CloudTrail event delivered by EventBridge |
detail["userIdentity"]["type"] == "Root" | Confirms the API call was made by the root account |
detail["eventName"] | The specific API action performed (e.g., ConsoleLogin, CreateUser) |
detail["sourceIPAddress"] | IP that made the root API call |
sns.publish(TopicArn, Subject, Message, MessageAttributes) | Sends alert to all SNS subscribers |
MessageAttributes | Key-value metadata — allows SNS filter policies for routing |
logger.critical(...) | High-severity CloudWatch log — can trigger separate metric alarms |
Testing the Monitor
# Simulate a CloudTrail root event in EventBridge (test mode)
aws lambda invoke \
--function-name RootAccountMonitor \
--payload '{
"detail": {
"eventName": "ConsoleLogin",
"eventSource": "signin.amazonaws.com",
"sourceIPAddress": "1.2.3.4",
"userAgent": "Mozilla/5.0",
"eventTime": "2025-01-20T14:30:00Z",
"awsRegion": "us-east-1",
"userIdentity": {"type": "Root", "accountId": "123456789012"}
}
}' \
response.json
cat response.json
🔍 Line-by-Line Code Walkthrough
Module-Level Constants
AUTHORIZED_ROOT_EVENTS = {"GetBillingData", "ListBillingReports"}
AUTHORIZED_SOURCE_IPS = set() # e.g., {"203.0.113.10"}
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:security-critical-alerts"
| Line | Explanation |
|---|
AUTHORIZED_ROOT_EVENTS | Python set for O(1) membership tests. Events in this set are known-safe root actions (e.g., automated billing scripts). We skip alerting for these |
AUTHORIZED_SOURCE_IPS | IPs of trusted corporate networks. Root activity from these IPs (e.g., your office) is expected |
SNS_TOPIC_ARN | The SNS topic that routes to PagerDuty, Slack, and email. Stored at module level so it’s easy to find and change |
lambda_handler(event, context) — Entry Point
sns = boto3.client("sns")
detail = event.get("detail", {})
| Line | Explanation |
|---|
boto3.client("sns") | Lambda automatically uses the execution role’s credentials. No region_name needed — it defaults to the Lambda function’s region |
event.get("detail", {}) | EventBridge wraps the CloudTrail event inside event["detail"]. Using .get() with {} default prevents KeyError if the event structure is unexpected |
event_name = detail.get("eventName", "Unknown")
event_source = detail.get("eventSource", "Unknown")
source_ip = detail.get("sourceIPAddress","Unknown")
user_agent = detail.get("userAgent", "Unknown")
event_time = detail.get("eventTime", "Unknown")
aws_region = detail.get("awsRegion", "Unknown")
account_id = detail.get("userIdentity", {}).get("accountId", "Unknown")
| Line | Explanation |
|---|
detail.get("eventName", "Unknown") | The API action called (e.g., "ConsoleLogin", "CreateUser", "PutRolePolicy"). Default "Unknown" prevents crashes on malformed events |
detail.get("eventSource", "Unknown") | Which AWS service received the call (e.g., "signin.amazonaws.com", "iam.amazonaws.com") |
detail.get("sourceIPAddress", "Unknown") | IP address of whoever made the root API call. "AWS Internal" appears for service-initiated actions |
detail.get("userAgent", "Unknown") | Browser or SDK info. "Mozilla/5.0" = console login. "aws-cli" = CLI. Helps identify the tool used |
detail.get("eventTime", "Unknown") | ISO 8601 timestamp when the API call happened |
detail.get("userIdentity", {}).get("accountId", "Unknown") | Chained .get() — first gets the userIdentity dict (or {} if absent), then gets accountId from it |
Allowlist Filtering
if event_name in AUTHORIZED_ROOT_EVENTS:
logger.info(f"Root API call {event_name} is in the authorized list — skipping alert")
return {"statusCode": 200, "message": "Authorized root event — skipped"}
if source_ip in AUTHORIZED_SOURCE_IPS:
logger.info(f"Root API call from authorized IP {source_ip} — skipping alert")
return {"statusCode": 200, "message": "Authorized IP — skipped"}
| Line | Explanation |
|---|
event_name in AUTHORIZED_ROOT_EVENTS | Set membership test — O(1). Returns True if this event name is in the approved list |
return {"statusCode": 200, ...} | Early return — stops Lambda execution. No SNS publish happens. Lambda still returns a success status so EventBridge doesn’t retry |
source_ip in AUTHORIZED_SOURCE_IPS | Checks if the source IP is a trusted corporate IP. Prevents false alerts from expected admin tasks |
Logging the Incident
logger.critical(
f"ROOT_ACCOUNT_USAGE | {event_name} | {source_ip} | {event_time} | {aws_region}"
)
| Line | Explanation |
|---|
logger.critical(...) | CloudWatch Logs records this at CRITICAL severity. You can create a CloudWatch Metric Filter on the pattern ROOT_ACCOUNT_USAGE and trigger a separate alarm — this creates a second independent alerting path |
| Pipe-separated format | Makes the log line queryable with CloudWatch Logs Insights: `filter @message like “ROOT_ACCOUNT_USAGE” |
sns.publish(...) — The Alert
sns.publish(
TopicArn=SNS_TOPIC_ARN,
Subject=f"🚨 CRITICAL: Root Account Activity in {aws_region} — {event_name}",
Message=alert_message,
MessageAttributes={
"severity": {"DataType": "String", "StringValue": "CRITICAL"},
"service": {"DataType": "String", "StringValue": "root-monitor"},
"event_name": {"DataType": "String", "StringValue": event_name},
},
)
| Parameter | Explanation |
|---|
TopicArn=SNS_TOPIC_ARN | The SNS topic to publish to. All subscribers receive the message |
Subject=... | Email subject line (for email subscriptions). Maximum 100 characters. SNS truncates longer subjects |
Message=alert_message | The full alert body. For email subscribers, this is the email body |
MessageAttributes={"severity": ...} | Key-value metadata attached to the message. SNS subscribers can create filter policies to only receive messages where severity = "CRITICAL" — useful when multiple scripts publish to the same topic |
"DataType": "String" | The attribute type. Must be "String", "Number", or "Binary" |
"StringValue": "CRITICAL" | The attribute value |
Return Value
return {
"statusCode": 200,
"message": "Root account alert published",
"event_name": event_name,
"source_ip": source_ip,
}
| Line | Explanation |
|---|
return {...} | Lambda return values are logged in CloudWatch and can be inspected. Returning structured data makes it easy to verify the alert was sent |
"statusCode": 200 | Convention borrowed from HTTP. Tells EventBridge the invocation succeeded — it won’t retry |
EventBridge Rule Pattern (Explained)
{
"source": ["aws.cloudtrail"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"userIdentity": {
"type": ["Root"]
}
}
}
| Field | Explanation |
|---|
"source": ["aws.cloudtrail"] | Only CloudTrail events trigger this rule |
"detail-type": ["AWS API Call via CloudTrail"] | The event type for API calls. Console logins use "AWS Console Sign In via CloudTrail" |
"detail.userIdentity.type": ["Root"] | EventBridge filters the event payload. Only events where userIdentity.type equals "Root" match — Lambda is NOT invoked for regular IAM user/role actions |