Scenario Intermediate Python Python AWS Scripting

Find Untagged Long-Running EC2 Instances & Alert via SES

Python script to find EC2 instances running for more than 7 days without a Project tag and send a compliance alert email using AWS SES.

January 20, 2025 8 min read ~15 min to complete DB
The Situation

Cloud governance script for enforcing mandatory tagging policies — finds ghost instances and alerts the infra team before they inflate the bill.

5 Steps
4 Services Used
~15 min Duration
Intermediate Difficulty

Problem Statement

Your organization mandates every EC2 instance must carry a Project tag (for billing allocation). Engineers spin up dev instances and forget them for weeks. Without a governance script, you discover the oversight only on the monthly bill.

Goal: Write a Python script that:

  • Scans all running EC2 instances
  • Identifies any running more than 7 days that are missing the Project tag
  • Sends a formatted compliance alert email via AWS SES

Required IAM Permissions

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": ["ec2:DescribeInstances"],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": ["ses:SendEmail"],
      "Resource": "arn:aws:ses:us-east-1:123456789012:identity/[email protected]"
    }
  ]
}

SES prerequisite: Verify both the sender and recipient email addresses (or the domain) in the SES console before running this script.


Complete Script

import boto3
from datetime import datetime, timezone, timedelta


def find_untagged_long_running_instances(region: str = "us-east-1") -> list[dict]:
    """
    Returns a list of violation dicts for instances that have been
    running > 7 days without a 'Project' tag.
    """
    ec2 = boto3.client("ec2", region_name=region)
    ses = boto3.client("ses", region_name=region)

    # datetime.now(timezone.utc) returns a timezone-aware UTC datetime.
    # EC2 instance["LaunchTime"] is also timezone-aware (UTC).
    # Both must be timezone-aware to subtract them without a TypeError.
    threshold = datetime.now(timezone.utc) - timedelta(days=7)
    violations = []

    # get_paginator ensures we get ALL instances even if there are > 1000
    paginator = ec2.get_paginator("describe_instances")
    for page in paginator.paginate(
        Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
    ):
        for reservation in page["Reservations"]:
            for instance in reservation["Instances"]:
                launch_time = instance["LaunchTime"]   # timezone-aware UTC datetime

                # Convert tags list to a dict for O(1) lookup
                # instance.get("Tags", []) returns [] if no tags — avoids KeyError
                tags = {t["Key"]: t["Value"] for t in instance.get("Tags", [])}

                # Check BOTH conditions: old enough AND missing required tag
                if launch_time < threshold and "Project" not in tags:
                    violations.append({
                        "InstanceId":    instance["InstanceId"],
                        "LaunchTime":    launch_time.strftime("%Y-%m-%d %H:%M UTC"),
                        "InstanceType":  instance["InstanceType"],
                        "Name":          tags.get("Name", "Unnamed"),
                        "Owner":         tags.get("Owner", "Unknown"),
                        "PrivateIp":     instance.get("PrivateIpAddress", "N/A"),
                        "RunningDays":   (datetime.now(timezone.utc) - launch_time).days,
                    })

    if violations:
        send_violation_email(ses, violations)
    else:
        print("All running instances are compliant.")

    return violations


def send_violation_email(ses_client, violations: list[dict]) -> None:
    """
    ses_client.send_email() requires:
    - Source: a verified sender email address or domain
    - Destination.ToAddresses: list of recipient emails
    - Message.Subject.Data: email subject string
    - Message.Body.Text.Data: plain-text body
      (Use Body.Html.Data for HTML-formatted emails)

    SES is a regional service — the client must match the region
    where your email identities are verified.
    """
    # Build a human-readable table for the email body
    rows = "\n".join([
        (f"  [{i+1}] {v['InstanceId']} | {v['Name']} | "
         f"{v['InstanceType']} | Running {v['RunningDays']} days | "
         f"Owner: {v['Owner']} | IP: {v['PrivateIp']}")
        for i, v in enumerate(violations)
    ])

    body = f"""
AWS Compliance Alert: Untagged Long-Running EC2 Instances
==========================================================

The following {len(violations)} EC2 instance(s) have been running for more
than 7 days without a required 'Project' tag:

{rows}

Required Action:
  1. Add the 'Project' tag to identify the billing owner.
  2. Terminate the instance if it is no longer needed.
  3. Instances without a Project tag will be stopped automatically
     after 14 days (policy enforcement pending).

This is an automated message from the Cloud Governance bot.
Run script: governance/find_untagged_instances.py
    """

    ses_client.send_email(
        Source="[email protected]",          # Must be SES-verified
        Destination={
            "ToAddresses": ["[email protected]"],
            "CcAddresses": ["[email protected]"],
        },
        Message={
            "Subject": {
                "Data": f"⚠️ {len(violations)} Untagged EC2 Instance(s) Detected",
                "Charset": "UTF-8",
            },
            "Body": {
                "Text": {
                    "Data": body,
                    "Charset": "UTF-8",
                }
            },
        },
        # Optional: ReplyToAddresses, ReturnPath, Tags for SES tracking
    )
    print(f"Alert email sent for {len(violations)} violation(s).")


# ── Lambda entry point (trigger via EventBridge daily) ────────────
def lambda_handler(event, context):
    violations = find_untagged_long_running_instances(region="us-east-1")
    return {
        "statusCode": 200,
        "violations_found": len(violations),
        "instances": [v["InstanceId"] for v in violations],
    }


# ── Local run ─────────────────────────────────────────────────────
if __name__ == "__main__":
    result = find_untagged_long_running_instances(region="ap-south-1")
    print(f"\nTotal violations: {len(result)}")
    for v in result:
        print(f"  - {v['InstanceId']} ({v['Name']}) running {v['RunningDays']} days")

Enhancement: Auto-Stop After 14 Days

def enforce_stop_policy(violations: list[dict], stop_after_days: int = 14):
    """
    Stop (not terminate) instances that have been running more than
    stop_after_days without a Project tag.
    Termination should require a human decision.
    """
    ec2 = boto3.client("ec2")
    to_stop = [v for v in violations if v["RunningDays"] >= stop_after_days]

    if not to_stop:
        return

    ids = [v["InstanceId"] for v in to_stop]
    ec2.stop_instances(InstanceIds=ids)
    print(f"Auto-stopped {len(ids)} instances: {ids}")

Key Commands Explained

CommandWhat it does
datetime.now(timezone.utc)Current time as UTC-aware datetime (required to compare with LaunchTime)
timedelta(days=7)7-day window — subtract from now to get the threshold
{t["Key"]: t["Value"] for t in tags}Converts list of {"Key":...,"Value":...} dicts to a normal dict
"Project" not in tagsO(1) dict key lookup — checks tag existence
ses.send_email(Source=..., Destination=..., Message=...)Sends an email via Amazon SES
Destination["CcAddresses"]CC recipients — SES supports To, CC, and BCC

Common Issues

MessageRejected: Email address not verified — Both sender and recipient must be verified in SES (or you must be out of sandbox mode). Go to SES console → Verified Identities.

SES sandbox limits — In sandbox, you can only send to verified addresses. Request production access in the SES console to send to anyone.

Empty violations list — Instance LaunchTime is stored in UTC. Ensure your local clock isn’t causing the 7-day comparison to behave unexpectedly. Use datetime.now(timezone.utc) not datetime.utcnow() (which is naive).


🔍 Line-by-Line Code Walkthrough

Imports

LineWhy It’s Used
import boto3AWS SDK. Needed for boto3.client("ec2") and boto3.client("ses")
from datetime import datetime, timezone, timedeltadatetime.now(timezone.utc) — timezone-aware now. timedelta(days=7) — 7-day duration. timezone.utc — UTC timezone object (makes datetimes comparable with AWS’s UTC timestamps)

find_untagged_long_running_instances()

ec2 = boto3.client("ec2", region_name=region)
ses = boto3.client("ses", region_name=region)
LineExplanation
boto3.client("ec2", region_name=region)EC2 client for the target region. EC2 instances exist in specific regions — a client for us-east-1 will not see instances in ap-south-1
boto3.client("ses", region_name=region)SES (Simple Email Service) client. SES is regional — email identities must be verified in the same region as the client
threshold = datetime.now(timezone.utc) - timedelta(days=7)
LineExplanation
datetime.now(timezone.utc)Returns the current UTC time as a timezone-aware datetime. The timezone.utc argument is what makes it aware
timedelta(days=7)A duration of 7 days. Subtracting it from “now” gives the exact datetime 7 days ago
thresholdAny instance launched before this datetime has been running more than 7 days
Why not datetime.utcnow()?datetime.utcnow() returns a naive datetime (no timezone info). Comparing a naive datetime with the timezone-aware instance["LaunchTime"] raises a TypeError at runtime
paginator = ec2.get_paginator("describe_instances")
for page in paginator.paginate(
    Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
):
LineExplanation
get_paginator("describe_instances")Creates a paginator that automatically handles NextToken loops. If you have 1,500 instances, describe_instances() alone only returns 1,000
paginator.paginate(Filters=[...])Streams pages. Each call to the API returns up to 1,000 instances. The paginator keeps calling until all pages are exhausted
"instance-state-name": ["running"]Server-side filter — only returns running instances. We care about long-running instances, not stopped/terminated ones
for reservation in page["Reservations"]:
    for instance in reservation["Instances"]:
        launch_time = instance["LaunchTime"]
LineExplanation
page["Reservations"]EC2’s response nests instances inside Reservations (groupings from a single launch command)
reservation["Instances"]The actual list of instance dicts within that reservation
instance["LaunchTime"]A timezone-aware UTC datetime when this instance was started. boto3 parses the API’s ISO 8601 string automatically into a Python datetime object
tags = {t["Key"]: t["Value"] for t in instance.get("Tags", [])}
LineExplanation
instance.get("Tags", [])AWS returns Tags as a list of {"Key": ..., "Value": ...} dicts. .get("Tags", []) returns an empty list if the instance has no tags at all
{t["Key"]: t["Value"] for t in ...}Dict comprehension that converts the list into a regular Python dict: {"Name": "web-server", "Owner": "alice", "Env": "prod"}. This enables O(1) key lookups
if launch_time < threshold and "Project" not in tags:
    violations.append({...})
LineExplanation
launch_time < thresholdBoth are UTC-aware datetimes. If the launch time is earlier than 7 days ago, the instance has been running more than 7 days
"Project" not in tagsDict key lookup (O(1)). Returns True if the Project key is absent from the tags dict
andBoth conditions must be true — the instance is only a violation if it’s old AND missing the tag
violations.append({...})Builds a list of dicts with human-readable info for the email body
"RunningDays": (datetime.now(timezone.utc) - launch_time).days,
LineExplanation
datetime.now(timezone.utc) - launch_timeSubtracts two timezone-aware datetimes to get a timedelta object
.daystimedelta.days extracts the whole-day count (ignoring hours/minutes). A 7.9-day old instance returns 7

send_violation_email(ses_client, violations)

rows = "\n".join([
    f"  [{i+1}] {v['InstanceId']} | {v['Name']} | ..."
    for i, v in enumerate(violations)
])
LineExplanation
enumerate(violations)Yields (index, item) pairs — lets us number each row [1], [2], etc.
"\n".join([...])Joins all formatted row strings with newlines into a single multi-line string
ses_client.send_email(
    Source="[email protected]",
    Destination={
        "ToAddresses": ["[email protected]"],
        "CcAddresses": ["[email protected]"],
    },
    Message={
        "Subject": {"Data": f"⚠️ {len(violations)} Untagged EC2 Instance(s)", "Charset": "UTF-8"},
        "Body": {"Text": {"Data": body, "Charset": "UTF-8"}},
    },
)
LineExplanation
Source=Must be a verified SES email identity. SES rejects mail from unverified senders
Destination.ToAddressesPrimary recipients — a Python list of email strings
Destination.CcAddressesCC recipients — receives a copy but is not in the “To” field
Message.Subject.DataThe email subject line. Charset: "UTF-8" enables emoji and non-ASCII characters
Message.Body.Text.DataPlain-text email body. Use Body.Html.Data for HTML-formatted emails
Charset: "UTF-8"Required when body contains non-ASCII characters (emoji, accented chars)

Lambda Entry Point

def lambda_handler(event, context):
    violations = find_untagged_long_running_instances(region="us-east-1")
    return {
        "statusCode": 200,
        "violations_found": len(violations),
        "instances": [v["InstanceId"] for v in violations],
    }
LineExplanation
lambda_handler(event, context)AWS Lambda’s required function signature. event contains EventBridge schedule payload. context has timeout info
return {"statusCode": 200, ...}Lambda return values are logged to CloudWatch. Returning a structured dict makes the result queryable. statusCode follows the API Gateway convention even for non-HTTP triggers
[v["InstanceId"] for v in violations]List comprehension extracting just the IDs for the return value
Services Used
EC2SESboto3IAM
Prerequisites
  • Python 3.8+
  • boto3
  • SES email verified
  • IAM: ec2:DescribeInstances, ses:SendEmail
What You Learned
  • Launch time comparison with UTC-aware datetime
  • Tag extraction from instance Tags list
  • SES send_email API
  • Pagination with describe_instances

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios