Scenario Advanced Python Python AWS Scripting

CloudTrail Root Account Monitor — Alert on Root API Usage via Lambda

Lambda function triggered by EventBridge to detect any AWS root account API activity in CloudTrail and immediately send a critical security alert via SNS.

January 20, 2025 8 min read ~20 min to complete DB
The Situation

Security monitoring — root account usage is a high-severity security event. The root account has unrestricted access and no IAM policies apply to it.

6 Steps
5 Services Used
~20 min Duration
Advanced Difficulty

Problem Statement

The AWS root account bypasses all IAM policies and has unrestricted access to everything in the account — including billing, support plans, and account closure. Any API call made by root should be treated as a potential security incident (unless it’s a known, scheduled administrative task).


Architecture

CloudTrail (all API calls)
         ↓
EventBridge rule:
  event.detail.userIdentity.type = "Root"
         ↓
Lambda (this script)
         ↓
SNS → PagerDuty / Slack / Email

EventBridge Rule Setup

# Create EventBridge rule to detect root account usage
aws events put-rule \
  --name detect-root-account-usage \
  --event-pattern '{
    "source": ["aws.cloudtrail"],
    "detail-type": ["AWS API Call via CloudTrail"],
    "detail": {
      "userIdentity": {
        "type": ["Root"]
      }
    }
  }' \
  --state ENABLED

# Add Lambda as the target
aws events put-targets \
  --rule detect-root-account-usage \
  --targets "Id=RootMonitorLambda,Arn=arn:aws:lambda:us-east-1:123456789012:function:RootAccountMonitor"

Complete Lambda Script

import boto3
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Known root account actions that are acceptable (e.g., automated billing scripts)
# Add your authorized IPs or event names here
AUTHORIZED_ROOT_EVENTS = {
    "GetBillingData",         # Billing automation
    "ListBillingReports",
}
AUTHORIZED_SOURCE_IPS = {
    # Add your corporate IPs here
    # "203.0.113.10",
}

SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:security-critical-alerts"


def lambda_handler(event: dict, context) -> dict:
    """
    EventBridge delivers events in this structure when triggered
    by a CloudTrail API call:

    event = {
      "version": "0",
      "id": "...",
      "source": "aws.cloudtrail",
      "detail-type": "AWS API Call via CloudTrail",
      "detail": {
        "eventVersion": "1.08",
        "userIdentity": {
          "type": "Root",
          "principalId": "123456789012",
          "arn": "arn:aws:iam::123456789012:root",
          "accountId": "123456789012"
        },
        "eventTime": "2025-01-20T14:30:00Z",
        "eventName": "ConsoleLogin",
        "eventSource": "signin.amazonaws.com",
        "sourceIPAddress": "203.0.113.5",
        "userAgent": "Mozilla/5.0...",
        "awsRegion": "us-east-1",
        "requestParameters": null,
        "responseElements": {"ConsoleLogin": "Success"}
      }
    }

    event["detail"] contains the raw CloudTrail event fields.
    """
    sns   = boto3.client("sns")
    detail = event.get("detail", {})

    # ── Extract key fields ─────────────────────────────────────────
    event_name   = detail.get("eventName",      "Unknown")
    event_source = detail.get("eventSource",    "Unknown")
    source_ip    = detail.get("sourceIPAddress","Unknown")
    user_agent   = detail.get("userAgent",      "Unknown")
    event_time   = detail.get("eventTime",      "Unknown")
    aws_region   = detail.get("awsRegion",      "Unknown")
    account_id   = detail.get("userIdentity", {}).get("accountId", "Unknown")

    # ── Filter: skip authorized root actions ──────────────────────
    if event_name in AUTHORIZED_ROOT_EVENTS:
        logger.info(f"Root API call {event_name} is in the authorized list — skipping alert")
        return {"statusCode": 200, "message": "Authorized root event — skipped"}

    if source_ip in AUTHORIZED_SOURCE_IPS:
        logger.info(f"Root API call from authorized IP {source_ip} — skipping alert")
        return {"statusCode": 200, "message": "Authorized IP — skipped"}

    # ── Log the incident ──────────────────────────────────────────
    # Using CRITICAL level ensures this appears in CloudWatch Logs
    # with severity that can trigger a separate metric filter alarm
    logger.critical(
        f"ROOT_ACCOUNT_USAGE | {event_name} | {source_ip} | {event_time} | {aws_region}"
    )

    # ── Build alert message ───────────────────────────────────────
    alert_message = f"""
🚨 ROOT ACCOUNT ACTIVITY DETECTED 🚨
======================================
Account ID:   {account_id}
Event:        {event_name}
Service:      {event_source}
Region:       {aws_region}
Time:         {event_time}
Source IP:    {source_ip}
User Agent:   {user_agent}

IMMEDIATE ACTIONS REQUIRED:
1. Verify if this action was authorized
2. If unauthorized — rotate root credentials NOW:
   a. Change root password
   b. Regenerate root MFA device
   c. Check for new IAM users/roles created by root
3. Review CloudTrail for all root activity in the last 24h:
   aws cloudtrail lookup-events \\
     --lookup-attributes AttributeKey=Username,AttributeValue=root \\
     --start-time $(date -d '24 hours ago' --utc +%Y-%m-%dT%H:%M:%SZ)
4. Escalate to Security team immediately

Full CloudTrail Event:
{json.dumps(detail, indent=2, default=str)}
    """

    # ── Publish to SNS ────────────────────────────────────────────
    # publish() sends the message to all SNS subscribers.
    # MessageAttributes let subscribers filter on severity.
    # Subject must be ≤ 100 characters for email delivery.
    #
    # MessageAttributes with DataType="String" allow SNS filter policies:
    # subscribers can opt to receive only CRITICAL messages.
    sns.publish(
        TopicArn=SNS_TOPIC_ARN,
        Subject=f"🚨 CRITICAL: Root Account Activity in {aws_region}{event_name}",
        Message=alert_message,
        MessageAttributes={
            "severity": {
                "DataType":    "String",
                "StringValue": "CRITICAL",
            },
            "service": {
                "DataType":    "String",
                "StringValue": "root-monitor",
            },
            "event_name": {
                "DataType":    "String",
                "StringValue": event_name,
            },
        },
    )

    logger.info(f"Alert published to SNS for root event: {event_name} from {source_ip}")

    return {
        "statusCode": 200,
        "message":    "Root account alert published",
        "event_name": event_name,
        "source_ip":  source_ip,
    }

Automated Incident Response (Optional Extension)

def create_incident_snapshot(detail: dict) -> None:
    """
    When root activity is detected, automatically capture a snapshot of
    current IAM state for forensic analysis.
    """
    iam = boto3.client("iam")
    s3  = boto3.client("s3")

    # Get current IAM account summary
    summary = iam.get_account_summary()["SummaryMap"]

    # List recently created IAM users (last 30 minutes)
    from datetime import datetime, timezone, timedelta
    users = []
    paginator = iam.get_paginator("list_users")
    threshold = datetime.now(timezone.utc) - timedelta(minutes=30)
    for page in paginator.paginate():
        for user in page["Users"]:
            if user["CreateDate"] > threshold:
                users.append(user["UserName"])

    incident_data = {
        "timestamp":         detail.get("eventTime"),
        "event":             detail.get("eventName"),
        "source_ip":         detail.get("sourceIPAddress"),
        "recent_new_users":  users,
        "iam_summary":       {k: v for k, v in summary.items() if "Users" in k or "Roles" in k},
    }

    # Store forensic data in S3 for later analysis
    s3.put_object(
        Bucket="your-security-audit-bucket",
        Key=f"incidents/root-activity/{detail.get('eventTime', 'unknown')}.json",
        Body=json.dumps(incident_data, indent=2),
        ServerSideEncryption="aws:kms",
    )

Key Commands Explained

CommandWhat it does
event["detail"]The raw CloudTrail event delivered by EventBridge
detail["userIdentity"]["type"] == "Root"Confirms the API call was made by the root account
detail["eventName"]The specific API action performed (e.g., ConsoleLogin, CreateUser)
detail["sourceIPAddress"]IP that made the root API call
sns.publish(TopicArn, Subject, Message, MessageAttributes)Sends alert to all SNS subscribers
MessageAttributesKey-value metadata — allows SNS filter policies for routing
logger.critical(...)High-severity CloudWatch log — can trigger separate metric alarms

Testing the Monitor

# Simulate a CloudTrail root event in EventBridge (test mode)
aws lambda invoke \
  --function-name RootAccountMonitor \
  --payload '{
    "detail": {
      "eventName": "ConsoleLogin",
      "eventSource": "signin.amazonaws.com",
      "sourceIPAddress": "1.2.3.4",
      "userAgent": "Mozilla/5.0",
      "eventTime": "2025-01-20T14:30:00Z",
      "awsRegion": "us-east-1",
      "userIdentity": {"type": "Root", "accountId": "123456789012"}
    }
  }' \
  response.json

cat response.json

🔍 Line-by-Line Code Walkthrough

Module-Level Constants

AUTHORIZED_ROOT_EVENTS = {"GetBillingData", "ListBillingReports"}
AUTHORIZED_SOURCE_IPS  = set()  # e.g., {"203.0.113.10"}
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:security-critical-alerts"
LineExplanation
AUTHORIZED_ROOT_EVENTSPython set for O(1) membership tests. Events in this set are known-safe root actions (e.g., automated billing scripts). We skip alerting for these
AUTHORIZED_SOURCE_IPSIPs of trusted corporate networks. Root activity from these IPs (e.g., your office) is expected
SNS_TOPIC_ARNThe SNS topic that routes to PagerDuty, Slack, and email. Stored at module level so it’s easy to find and change

lambda_handler(event, context) — Entry Point

sns    = boto3.client("sns")
detail = event.get("detail", {})
LineExplanation
boto3.client("sns")Lambda automatically uses the execution role’s credentials. No region_name needed — it defaults to the Lambda function’s region
event.get("detail", {})EventBridge wraps the CloudTrail event inside event["detail"]. Using .get() with {} default prevents KeyError if the event structure is unexpected

Extracting CloudTrail Event Fields

event_name   = detail.get("eventName",      "Unknown")
event_source = detail.get("eventSource",    "Unknown")
source_ip    = detail.get("sourceIPAddress","Unknown")
user_agent   = detail.get("userAgent",      "Unknown")
event_time   = detail.get("eventTime",      "Unknown")
aws_region   = detail.get("awsRegion",      "Unknown")
account_id   = detail.get("userIdentity", {}).get("accountId", "Unknown")
LineExplanation
detail.get("eventName", "Unknown")The API action called (e.g., "ConsoleLogin", "CreateUser", "PutRolePolicy"). Default "Unknown" prevents crashes on malformed events
detail.get("eventSource", "Unknown")Which AWS service received the call (e.g., "signin.amazonaws.com", "iam.amazonaws.com")
detail.get("sourceIPAddress", "Unknown")IP address of whoever made the root API call. "AWS Internal" appears for service-initiated actions
detail.get("userAgent", "Unknown")Browser or SDK info. "Mozilla/5.0" = console login. "aws-cli" = CLI. Helps identify the tool used
detail.get("eventTime", "Unknown")ISO 8601 timestamp when the API call happened
detail.get("userIdentity", {}).get("accountId", "Unknown")Chained .get() — first gets the userIdentity dict (or {} if absent), then gets accountId from it

Allowlist Filtering

if event_name in AUTHORIZED_ROOT_EVENTS:
    logger.info(f"Root API call {event_name} is in the authorized list — skipping alert")
    return {"statusCode": 200, "message": "Authorized root event — skipped"}

if source_ip in AUTHORIZED_SOURCE_IPS:
    logger.info(f"Root API call from authorized IP {source_ip} — skipping alert")
    return {"statusCode": 200, "message": "Authorized IP — skipped"}
LineExplanation
event_name in AUTHORIZED_ROOT_EVENTSSet membership test — O(1). Returns True if this event name is in the approved list
return {"statusCode": 200, ...}Early return — stops Lambda execution. No SNS publish happens. Lambda still returns a success status so EventBridge doesn’t retry
source_ip in AUTHORIZED_SOURCE_IPSChecks if the source IP is a trusted corporate IP. Prevents false alerts from expected admin tasks

Logging the Incident

logger.critical(
    f"ROOT_ACCOUNT_USAGE | {event_name} | {source_ip} | {event_time} | {aws_region}"
)
LineExplanation
logger.critical(...)CloudWatch Logs records this at CRITICAL severity. You can create a CloudWatch Metric Filter on the pattern ROOT_ACCOUNT_USAGE and trigger a separate alarm — this creates a second independent alerting path
Pipe-separated formatMakes the log line queryable with CloudWatch Logs Insights: `filter @message like “ROOT_ACCOUNT_USAGE”

sns.publish(...) — The Alert

sns.publish(
    TopicArn=SNS_TOPIC_ARN,
    Subject=f"🚨 CRITICAL: Root Account Activity in {aws_region}{event_name}",
    Message=alert_message,
    MessageAttributes={
        "severity":   {"DataType": "String", "StringValue": "CRITICAL"},
        "service":    {"DataType": "String", "StringValue": "root-monitor"},
        "event_name": {"DataType": "String", "StringValue": event_name},
    },
)
ParameterExplanation
TopicArn=SNS_TOPIC_ARNThe SNS topic to publish to. All subscribers receive the message
Subject=...Email subject line (for email subscriptions). Maximum 100 characters. SNS truncates longer subjects
Message=alert_messageThe full alert body. For email subscribers, this is the email body
MessageAttributes={"severity": ...}Key-value metadata attached to the message. SNS subscribers can create filter policies to only receive messages where severity = "CRITICAL" — useful when multiple scripts publish to the same topic
"DataType": "String"The attribute type. Must be "String", "Number", or "Binary"
"StringValue": "CRITICAL"The attribute value

Return Value

return {
    "statusCode": 200,
    "message":    "Root account alert published",
    "event_name": event_name,
    "source_ip":  source_ip,
}
LineExplanation
return {...}Lambda return values are logged in CloudWatch and can be inspected. Returning structured data makes it easy to verify the alert was sent
"statusCode": 200Convention borrowed from HTTP. Tells EventBridge the invocation succeeded — it won’t retry

EventBridge Rule Pattern (Explained)

{
  "source": ["aws.cloudtrail"],
  "detail-type": ["AWS API Call via CloudTrail"],
  "detail": {
    "userIdentity": {
      "type": ["Root"]
    }
  }
}
FieldExplanation
"source": ["aws.cloudtrail"]Only CloudTrail events trigger this rule
"detail-type": ["AWS API Call via CloudTrail"]The event type for API calls. Console logins use "AWS Console Sign In via CloudTrail"
"detail.userIdentity.type": ["Root"]EventBridge filters the event payload. Only events where userIdentity.type equals "Root" match — Lambda is NOT invoked for regular IAM user/role actions
Services Used
CloudTrailEventBridgeLambdaSNSboto3
Prerequisites
  • Python 3.8+
  • CloudTrail enabled
  • EventBridge rule for root API calls
  • SNS topic for alerts
  • Lambda execution role with SNS publish permission
What You Learned
  • EventBridge rule pattern for CloudTrail root events
  • Lambda handler event structure
  • SNS publish with message attributes
  • Security incident response automation

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios