Scenario Intermediate Python Python AWS Scripting

EBS Snapshot Manager — Auto Backup & Retention Cleanup

Write a Python boto3 script to snapshot all EBS volumes attached to running EC2 instances and automatically delete snapshots older than 30 days.

January 20, 2025 9 min read ~20 min to complete DB
The Situation

Production backup automation — ensures every EBS volume is snapshotted daily and old snapshots are pruned to control costs.

6 Steps
4 Services Used
~20 min Duration
Intermediate Difficulty

Problem Statement

Your team’s EC2 instances hold critical application data on EBS volumes. Without automated snapshots, a failed deployment or accidental rm -rf could mean permanent data loss. Snapshots stored indefinitely inflate storage costs; a 30-day retention window gives you a full month to recover while keeping the bill reasonable.

Goal: Write a Python script that:

  • Discovers every EBS volume attached to running EC2 instances
  • Creates a tagged snapshot for each volume
  • Deletes any AutoBackup=true snapshot older than 30 days
  • Is safe to run as a daily Lambda or cron job

Required IAM Permissions

{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Action": [
      "ec2:DescribeInstances",
      "ec2:CreateSnapshot",
      "ec2:DescribeSnapshots",
      "ec2:DeleteSnapshot"
    ],
    "Resource": "*"
  }]
}

Complete Script

import boto3
from datetime import datetime, timezone, timedelta
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")


class EBSSnapshotManager:
    def __init__(self, region="us-east-1", retention_days=30):
        """
        boto3.client("ec2") creates a low-level EC2 service client.
        region_name targets the correct AWS region.
        retention_days controls how long snapshots are kept.
        """
        self.ec2 = boto3.client("ec2", region_name=region)
        self.retention_days = retention_days

    # ── Step 1: Discover volumes ─────────────────────────────────────
    def get_running_instance_volumes(self) -> list[dict]:
        """
        get_paginator("describe_instances") returns a paginator object.
        Paginators automatically handle the NextToken loop so you never
        miss instances when you have more than 1,000 results.

        paginator.paginate() yields one page dict at a time.
        Each page["Reservations"] is a list of Reservation objects.
        Each Reservation["Instances"] is a list of Instance objects.

        BlockDeviceMappings lists every EBS volume attached to the instance.
        Each mapping has "DeviceName" (e.g., /dev/xvda) and "Ebs.VolumeId".
        """
        volumes = []
        paginator = self.ec2.get_paginator("describe_instances")

        for page in paginator.paginate(
            Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
        ):
            for reservation in page["Reservations"]:
                for instance in reservation["Instances"]:
                    instance_id = instance["InstanceId"]

                    # Extract the Name tag — default to instance ID if absent
                    instance_name = next(
                        (tag["Value"] for tag in instance.get("Tags", [])
                         if tag["Key"] == "Name"),
                        instance_id,
                    )

                    for mapping in instance.get("BlockDeviceMappings", []):
                        volumes.append({
                            "volume_id":     mapping["Ebs"]["VolumeId"],
                            "instance_id":   instance_id,
                            "instance_name": instance_name,
                            "device":        mapping["DeviceName"],  # e.g. /dev/xvda
                        })
        return volumes

    # ── Step 2: Create snapshots ─────────────────────────────────────
    def create_snapshots(self, volumes: list[dict]) -> list[str]:
        """
        create_snapshot() initiates an async snapshot.
        The snapshot state starts as "pending" and transitions to "completed".

        TagSpecifications lets you tag the snapshot at creation time
        (atomic — avoids a separate create_tags call).

        ResourceType: "snapshot" tells AWS these tags belong to the snapshot,
        not the source volume.
        """
        created = []
        timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M")

        for vol in volumes:
            try:
                response = self.ec2.create_snapshot(
                    VolumeId=vol["volume_id"],
                    Description=(
                        f"Auto-backup {vol['instance_name']} "
                        f"{vol['device']} {timestamp}"
                    ),
                    TagSpecifications=[{
                        "ResourceType": "snapshot",
                        "Tags": [
                            {"Key": "Name",       "Value": f"auto-snap-{vol['instance_name']}-{timestamp}"},
                            {"Key": "AutoBackup", "Value": "true"},   # Used by cleanup filter
                            {"Key": "InstanceId", "Value": vol["instance_id"]},
                            {"Key": "CreatedAt",  "Value": timestamp},
                        ],
                    }],
                )
                snapshot_id = response["SnapshotId"]
                created.append(snapshot_id)
                logger.info(
                    f"Created snapshot {snapshot_id} for volume "
                    f"{vol['volume_id']} ({vol['instance_name']} {vol['device']})"
                )
            except Exception as e:
                logger.error(f"Failed to snapshot {vol['volume_id']}: {e}")

        return created

    # ── Step 3: Delete old snapshots ─────────────────────────────────
    def delete_old_snapshots(self) -> list[str]:
        """
        describe_snapshots with OwnerIds=["self"] only returns YOUR snapshots
        (not public ones — important to avoid accidental deletions).

        The tag filter Filters=[{"Name":"tag:AutoBackup","Values":["true"]}]
        ensures we only touch snapshots this script created, never manual ones.

        snapshot["StartTime"] is timezone-aware (UTC) so we compare against
        datetime.now(timezone.utc) — mixing naive/aware datetimes raises TypeError.
        """
        cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days)
        deleted = []
        paginator = self.ec2.get_paginator("describe_snapshots")

        for page in paginator.paginate(
            Filters=[{"Name": "tag:AutoBackup", "Values": ["true"]}],
            OwnerIds=["self"],
        ):
            for snapshot in page["Snapshots"]:
                if snapshot["StartTime"] < cutoff:
                    try:
                        self.ec2.delete_snapshot(SnapshotId=snapshot["SnapshotId"])
                        deleted.append(snapshot["SnapshotId"])
                        age_days = (datetime.now(timezone.utc) - snapshot["StartTime"]).days
                        logger.info(
                            f"Deleted snapshot {snapshot['SnapshotId']} "
                            f"(age: {age_days} days)"
                        )
                    except Exception as e:
                        # Snapshot may be in use by an AMI — skip gracefully
                        logger.error(
                            f"Could not delete {snapshot['SnapshotId']}: {e}"
                        )

        return deleted

    # ── Orchestrator ──────────────────────────────────────────────────
    def run(self) -> dict:
        logger.info("Starting EBS snapshot backup cycle...")
        volumes = self.get_running_instance_volumes()
        logger.info(f"Found {len(volumes)} volumes across running instances")

        created = self.create_snapshots(volumes)
        deleted = self.delete_old_snapshots()

        result = {
            "volumes_found":      len(volumes),
            "snapshots_created":  len(created),
            "snapshots_deleted":  len(deleted),
            "created_ids":        created,
            "deleted_ids":        deleted,
        }
        logger.info(f"Backup cycle complete: {result}")
        return result


# ── Lambda entry point ────────────────────────────────────────────
def lambda_handler(event, context):
    """Deploy as Lambda + EventBridge cron(0 1 * * ? *) to run at 1 AM UTC daily."""
    manager = EBSSnapshotManager(retention_days=30)
    return manager.run()


# ── Local / CLI entry point ───────────────────────────────────────
if __name__ == "__main__":
    manager = EBSSnapshotManager(region="ap-south-1", retention_days=30)
    result = manager.run()
    print(result)

Key Commands Explained

CommandWhat it does
get_paginator("describe_instances")Returns a paginator — handles 1000+ result sets automatically
paginator.paginate(Filters=[...])Iterates pages; each page is a full API response dict
BlockDeviceMappingsList of EBS volumes attached to the instance at boot time
create_snapshot(VolumeId=..., TagSpecifications=[...])Creates async snapshot with tags in one API call
response["SnapshotId"]The new snapshot’s ID (e.g., snap-0abc123)
get_paginator("describe_snapshots")Paginates through all snapshots
OwnerIds=["self"]Only return snapshots owned by THIS AWS account
delete_snapshot(SnapshotId=...)Permanently deletes a snapshot — cannot be undone
snapshot["StartTime"]UTC datetime when the snapshot was initiated

Deployment as a Daily Lambda

# 1. Zip the script
zip ebs_backup.zip ebs_snapshot_manager.py

# 2. Create Lambda function
aws lambda create-function \
  --function-name EBSSnapshotManager \
  --runtime python3.12 \
  --role arn:aws:iam::123456789012:role/LambdaEC2BackupRole \
  --handler ebs_snapshot_manager.lambda_handler \
  --zip-file fileb://ebs_backup.zip \
  --timeout 300

# 3. Add daily EventBridge trigger (1 AM UTC)
aws events put-rule \
  --name DailyEBSBackup \
  --schedule-expression "cron(0 1 * * ? *)" \
  --state ENABLED

Common Issues

SnapshotCreationPerVolumeRateExceeded — AWS allows a limited rate of snapshot creation per volume. Add a time.sleep(1) between create_snapshot calls to stay within limits.

InvalidSnapshot.InUse — The snapshot is registered as an AMI. Deregister the AMI first, then delete the snapshot.

Snapshot stays “pending” — Large volumes take longer. Don’t wait synchronously; the Lambda will complete and the snapshot will finish independently. Use an EventBridge rule to check completion if needed.


🔍 Line-by-Line Code Walkthrough

Imports

LineWhy It’s Used
import boto3AWS SDK for Python — needed to call EC2 APIs for snapshots and instance discovery
from datetime import datetime, timezone, timedeltadatetime.now(timezone.utc) gives a timezone-aware UTC timestamp. timedelta(days=30) computes the cutoff date. timezone.utc makes the datetime UTC-aware (required to compare with boto3’s UTC timestamps)
import loggingPython standard library for structured log output with timestamps and severity levels

logging.basicConfig(...)

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
PartExplanation
level=logging.INFOOnly log messages at INFO level and above (INFO, WARNING, ERROR, CRITICAL). DEBUG messages are suppressed
format="%(asctime)s %(levelname)s %(message)s"Each log line shows: timestamp, severity level, and the actual message

EBSSnapshotManager.__init__

def __init__(self, region="us-east-1", retention_days=30):
    self.ec2 = boto3.client("ec2", region_name=region)
    self.retention_days = retention_days
LineExplanation
boto3.client("ec2", region_name=region)Creates an EC2 API client. EBS snapshots and instances are regional — you need the correct region
self.ec2 = ...Stores the client on the instance so all methods share one connection pool
self.retention_days = retention_daysStored for use in delete_old_snapshots(). Default 30 means snapshots older than 30 days are deleted

get_running_instance_volumes()

paginator = self.ec2.get_paginator("describe_instances")
LineExplanation
get_paginator("describe_instances")Returns a Paginator object that automatically handles the NextToken loop. Without this, describe_instances() returns at most 1000 instances and silently drops the rest
for page in paginator.paginate(
    Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
):
LineExplanation
paginator.paginate(Filters=[...])Iterates pages. Each page is one API response dict. The paginator calls the API repeatedly, adding NextToken automatically until no more pages exist
Filters=[{"Name": "instance-state-name", "Values": ["running"]}]Server-side filter — only returns running instances. We don’t want snapshots of stopped or terminated instances
for reservation in page["Reservations"]:
    for instance in reservation["Instances"]:
        instance_id = instance["InstanceId"]
LineExplanation
page["Reservations"]EC2 groups instances into Reservations. A Reservation is one launch request that may have launched multiple instances
reservation["Instances"]Each Reservation contains 1 or more Instance dicts
instance["InstanceId"]The unique ID of this EC2 instance (e.g., i-0abc123def456789)
instance_name = next(
    (tag["Value"] for tag in instance.get("Tags", []) if tag["Key"] == "Name"),
    instance_id,
)
LineExplanation
instance.get("Tags", [])Returns [] if the instance has no tags — avoids KeyError
tag["Key"] == "Name"Tags are stored as a list of dicts. We search for the tag whose Key is “Name”
next(..., instance_id)Returns the first match. Falls back to the InstanceId string if there’s no Name tag
for mapping in instance.get("BlockDeviceMappings", []):
    volumes.append({
        "volume_id": mapping["Ebs"]["VolumeId"],
        "device":    mapping["DeviceName"],
    })
LineExplanation
BlockDeviceMappingsLists all EBS volumes attached to this instance. Each entry has DeviceName (e.g., /dev/xvda) and Ebs.VolumeId
mapping["Ebs"]["VolumeId"]The EBS Volume ID (e.g., vol-0abc123). This is what create_snapshot() needs
mapping["DeviceName"]The device path inside the OS (e.g., /dev/xvda for root, /dev/xvdb for data). Used in the snapshot description for human readability

create_snapshots(volumes)

response = self.ec2.create_snapshot(
    VolumeId=vol["volume_id"],
    Description=f"Auto-backup {vol['instance_name']} {vol['device']} {timestamp}",
    TagSpecifications=[{
        "ResourceType": "snapshot",
        "Tags": [
            {"Key": "AutoBackup", "Value": "true"},
            ...
        ],
    }],
)
LineExplanation
create_snapshot(VolumeId=...)Initiates an EBS snapshot. Snapshots are asynchronous — the API returns immediately with a snap-xxx ID while the actual data copy continues in the background
Description=...A human-readable label stored with the snapshot. Not used programmatically — just for humans browsing the console
TagSpecifications=[{"ResourceType": "snapshot", "Tags": [...]}]Tags the snapshot at creation time in one atomic call. ResourceType: "snapshot" tells EC2 these tags belong to the snapshot, not the source volume
{"Key": "AutoBackup", "Value": "true"}This tag is the filter key used by the cleanup function. Only snapshots with this tag will ever be auto-deleted
response["SnapshotId"]The new snapshot’s ID (e.g., snap-0abc123def456789). Used for logging and returned to the caller

delete_old_snapshots()

cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days)
LineExplanation
datetime.now(timezone.utc)Current time as a timezone-aware UTC datetime. This is critical — if you use datetime.utcnow() (naive), subtracting it from snapshot["StartTime"] (timezone-aware) raises TypeError
timedelta(days=self.retention_days)A duration object. Subtracting 30 days from “now” gives the cutoff date. Any snapshot created before this date is expired
paginator = self.ec2.get_paginator("describe_snapshots")
for page in paginator.paginate(
    Filters=[{"Name": "tag:AutoBackup", "Values": ["true"]}],
    OwnerIds=["self"],
):
LineExplanation
get_paginator("describe_snapshots")Paginates through all snapshots. An account can have thousands of snapshots — pagination ensures none are missed
Filters=[{"Name": "tag:AutoBackup", "Values": ["true"]}]Only returns snapshots tagged AutoBackup=true. This is the safety fence — we never accidentally delete manually-created snapshots
OwnerIds=["self"]Only shows snapshots owned by this AWS account. Without this, public AWS snapshots (used for AMIs) could appear and be accidentally deleted
if snapshot["StartTime"] < cutoff:
    self.ec2.delete_snapshot(SnapshotId=snapshot["SnapshotId"])
LineExplanation
snapshot["StartTime"]A timezone-aware datetime object (UTC) representing when the snapshot started. boto3 automatically converts the API’s ISO 8601 string to a Python datetime
< cutoffCompares two timezone-aware datetimes. If the snapshot is older than the cutoff, it’s expired
delete_snapshot(SnapshotId=...)Permanently and irreversibly deletes the snapshot and frees the storage. This cannot be undone

run() — Orchestrator

volumes = self.get_running_instance_volumes()
created = self.create_snapshots(volumes)
deleted = self.delete_old_snapshots()
LineExplanation
Order mattersWe create new snapshots before deleting old ones. This ensures we never have a window with fewer-than-retention snapshots
Returns a dict{"volumes_found": N, "snapshots_created": M, "snapshots_deleted": K, ...} — useful for Lambda return values and CloudWatch metric publishing

Lambda & Local Entry Points

def lambda_handler(event, context):
    manager = EBSSnapshotManager(retention_days=30)
    return manager.run()

if __name__ == "__main__":
    manager = EBSSnapshotManager(region="ap-south-1", retention_days=30)
LineExplanation
lambda_handler(event, context)AWS Lambda calls this function when triggered. event contains EventBridge payload (mostly unused here). context has Lambda metadata (timeout remaining, etc.)
if __name__ == "__main__":Python runs this block only when the script is executed directly (not when imported). This lets the same file work both as a module and as a runnable script
Services Used
EC2EBSboto3IAM
Prerequisites
  • Python 3.8+
  • boto3
  • IAM permissions: ec2:DescribeInstances, ec2:CreateSnapshot, ec2:DeleteSnapshot, ec2:DescribeSnapshots
What You Learned
  • boto3 paginator pattern
  • EC2 BlockDeviceMappings
  • Snapshot lifecycle management
  • TagSpecifications
  • Waiter usage

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios