The Situation
Production backup automation — ensures every EBS volume is snapshotted daily and old snapshots are pruned to control costs.
Problem Statement
Your team’s EC2 instances hold critical application data on EBS volumes. Without automated snapshots, a failed deployment or accidental rm -rf could mean permanent data loss. Snapshots stored indefinitely inflate storage costs; a 30-day retention window gives you a full month to recover while keeping the bill reasonable.
Goal: Write a Python script that:
- Discovers every EBS volume attached to running EC2 instances
- Creates a tagged snapshot for each volume
- Deletes any
AutoBackup=true snapshot older than 30 days - Is safe to run as a daily Lambda or cron job
Required IAM Permissions
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": [
"ec2:DescribeInstances",
"ec2:CreateSnapshot",
"ec2:DescribeSnapshots",
"ec2:DeleteSnapshot"
],
"Resource": "*"
}]
}
Complete Script
import boto3
from datetime import datetime, timezone, timedelta
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
class EBSSnapshotManager:
def __init__(self, region="us-east-1", retention_days=30):
"""
boto3.client("ec2") creates a low-level EC2 service client.
region_name targets the correct AWS region.
retention_days controls how long snapshots are kept.
"""
self.ec2 = boto3.client("ec2", region_name=region)
self.retention_days = retention_days
# ── Step 1: Discover volumes ─────────────────────────────────────
def get_running_instance_volumes(self) -> list[dict]:
"""
get_paginator("describe_instances") returns a paginator object.
Paginators automatically handle the NextToken loop so you never
miss instances when you have more than 1,000 results.
paginator.paginate() yields one page dict at a time.
Each page["Reservations"] is a list of Reservation objects.
Each Reservation["Instances"] is a list of Instance objects.
BlockDeviceMappings lists every EBS volume attached to the instance.
Each mapping has "DeviceName" (e.g., /dev/xvda) and "Ebs.VolumeId".
"""
volumes = []
paginator = self.ec2.get_paginator("describe_instances")
for page in paginator.paginate(
Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
):
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
instance_id = instance["InstanceId"]
# Extract the Name tag — default to instance ID if absent
instance_name = next(
(tag["Value"] for tag in instance.get("Tags", [])
if tag["Key"] == "Name"),
instance_id,
)
for mapping in instance.get("BlockDeviceMappings", []):
volumes.append({
"volume_id": mapping["Ebs"]["VolumeId"],
"instance_id": instance_id,
"instance_name": instance_name,
"device": mapping["DeviceName"], # e.g. /dev/xvda
})
return volumes
# ── Step 2: Create snapshots ─────────────────────────────────────
def create_snapshots(self, volumes: list[dict]) -> list[str]:
"""
create_snapshot() initiates an async snapshot.
The snapshot state starts as "pending" and transitions to "completed".
TagSpecifications lets you tag the snapshot at creation time
(atomic — avoids a separate create_tags call).
ResourceType: "snapshot" tells AWS these tags belong to the snapshot,
not the source volume.
"""
created = []
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d-%H%M")
for vol in volumes:
try:
response = self.ec2.create_snapshot(
VolumeId=vol["volume_id"],
Description=(
f"Auto-backup {vol['instance_name']} "
f"{vol['device']} {timestamp}"
),
TagSpecifications=[{
"ResourceType": "snapshot",
"Tags": [
{"Key": "Name", "Value": f"auto-snap-{vol['instance_name']}-{timestamp}"},
{"Key": "AutoBackup", "Value": "true"}, # Used by cleanup filter
{"Key": "InstanceId", "Value": vol["instance_id"]},
{"Key": "CreatedAt", "Value": timestamp},
],
}],
)
snapshot_id = response["SnapshotId"]
created.append(snapshot_id)
logger.info(
f"Created snapshot {snapshot_id} for volume "
f"{vol['volume_id']} ({vol['instance_name']} {vol['device']})"
)
except Exception as e:
logger.error(f"Failed to snapshot {vol['volume_id']}: {e}")
return created
# ── Step 3: Delete old snapshots ─────────────────────────────────
def delete_old_snapshots(self) -> list[str]:
"""
describe_snapshots with OwnerIds=["self"] only returns YOUR snapshots
(not public ones — important to avoid accidental deletions).
The tag filter Filters=[{"Name":"tag:AutoBackup","Values":["true"]}]
ensures we only touch snapshots this script created, never manual ones.
snapshot["StartTime"] is timezone-aware (UTC) so we compare against
datetime.now(timezone.utc) — mixing naive/aware datetimes raises TypeError.
"""
cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days)
deleted = []
paginator = self.ec2.get_paginator("describe_snapshots")
for page in paginator.paginate(
Filters=[{"Name": "tag:AutoBackup", "Values": ["true"]}],
OwnerIds=["self"],
):
for snapshot in page["Snapshots"]:
if snapshot["StartTime"] < cutoff:
try:
self.ec2.delete_snapshot(SnapshotId=snapshot["SnapshotId"])
deleted.append(snapshot["SnapshotId"])
age_days = (datetime.now(timezone.utc) - snapshot["StartTime"]).days
logger.info(
f"Deleted snapshot {snapshot['SnapshotId']} "
f"(age: {age_days} days)"
)
except Exception as e:
# Snapshot may be in use by an AMI — skip gracefully
logger.error(
f"Could not delete {snapshot['SnapshotId']}: {e}"
)
return deleted
# ── Orchestrator ──────────────────────────────────────────────────
def run(self) -> dict:
logger.info("Starting EBS snapshot backup cycle...")
volumes = self.get_running_instance_volumes()
logger.info(f"Found {len(volumes)} volumes across running instances")
created = self.create_snapshots(volumes)
deleted = self.delete_old_snapshots()
result = {
"volumes_found": len(volumes),
"snapshots_created": len(created),
"snapshots_deleted": len(deleted),
"created_ids": created,
"deleted_ids": deleted,
}
logger.info(f"Backup cycle complete: {result}")
return result
# ── Lambda entry point ────────────────────────────────────────────
def lambda_handler(event, context):
"""Deploy as Lambda + EventBridge cron(0 1 * * ? *) to run at 1 AM UTC daily."""
manager = EBSSnapshotManager(retention_days=30)
return manager.run()
# ── Local / CLI entry point ───────────────────────────────────────
if __name__ == "__main__":
manager = EBSSnapshotManager(region="ap-south-1", retention_days=30)
result = manager.run()
print(result)
Key Commands Explained
| Command | What it does |
|---|
get_paginator("describe_instances") | Returns a paginator — handles 1000+ result sets automatically |
paginator.paginate(Filters=[...]) | Iterates pages; each page is a full API response dict |
BlockDeviceMappings | List of EBS volumes attached to the instance at boot time |
create_snapshot(VolumeId=..., TagSpecifications=[...]) | Creates async snapshot with tags in one API call |
response["SnapshotId"] | The new snapshot’s ID (e.g., snap-0abc123) |
get_paginator("describe_snapshots") | Paginates through all snapshots |
OwnerIds=["self"] | Only return snapshots owned by THIS AWS account |
delete_snapshot(SnapshotId=...) | Permanently deletes a snapshot — cannot be undone |
snapshot["StartTime"] | UTC datetime when the snapshot was initiated |
Deployment as a Daily Lambda
# 1. Zip the script
zip ebs_backup.zip ebs_snapshot_manager.py
# 2. Create Lambda function
aws lambda create-function \
--function-name EBSSnapshotManager \
--runtime python3.12 \
--role arn:aws:iam::123456789012:role/LambdaEC2BackupRole \
--handler ebs_snapshot_manager.lambda_handler \
--zip-file fileb://ebs_backup.zip \
--timeout 300
# 3. Add daily EventBridge trigger (1 AM UTC)
aws events put-rule \
--name DailyEBSBackup \
--schedule-expression "cron(0 1 * * ? *)" \
--state ENABLED
Common Issues
SnapshotCreationPerVolumeRateExceeded — AWS allows a limited rate of snapshot creation per volume. Add a time.sleep(1) between create_snapshot calls to stay within limits.
InvalidSnapshot.InUse — The snapshot is registered as an AMI. Deregister the AMI first, then delete the snapshot.
Snapshot stays “pending” — Large volumes take longer. Don’t wait synchronously; the Lambda will complete and the snapshot will finish independently. Use an EventBridge rule to check completion if needed.
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|
import boto3 | AWS SDK for Python — needed to call EC2 APIs for snapshots and instance discovery |
from datetime import datetime, timezone, timedelta | datetime.now(timezone.utc) gives a timezone-aware UTC timestamp. timedelta(days=30) computes the cutoff date. timezone.utc makes the datetime UTC-aware (required to compare with boto3’s UTC timestamps) |
import logging | Python standard library for structured log output with timestamps and severity levels |
logging.basicConfig(...)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
| Part | Explanation |
|---|
level=logging.INFO | Only log messages at INFO level and above (INFO, WARNING, ERROR, CRITICAL). DEBUG messages are suppressed |
format="%(asctime)s %(levelname)s %(message)s" | Each log line shows: timestamp, severity level, and the actual message |
EBSSnapshotManager.__init__
def __init__(self, region="us-east-1", retention_days=30):
self.ec2 = boto3.client("ec2", region_name=region)
self.retention_days = retention_days
| Line | Explanation |
|---|
boto3.client("ec2", region_name=region) | Creates an EC2 API client. EBS snapshots and instances are regional — you need the correct region |
self.ec2 = ... | Stores the client on the instance so all methods share one connection pool |
self.retention_days = retention_days | Stored for use in delete_old_snapshots(). Default 30 means snapshots older than 30 days are deleted |
get_running_instance_volumes()
paginator = self.ec2.get_paginator("describe_instances")
| Line | Explanation |
|---|
get_paginator("describe_instances") | Returns a Paginator object that automatically handles the NextToken loop. Without this, describe_instances() returns at most 1000 instances and silently drops the rest |
for page in paginator.paginate(
Filters=[{"Name": "instance-state-name", "Values": ["running"]}]
):
| Line | Explanation |
|---|
paginator.paginate(Filters=[...]) | Iterates pages. Each page is one API response dict. The paginator calls the API repeatedly, adding NextToken automatically until no more pages exist |
Filters=[{"Name": "instance-state-name", "Values": ["running"]}] | Server-side filter — only returns running instances. We don’t want snapshots of stopped or terminated instances |
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
instance_id = instance["InstanceId"]
| Line | Explanation |
|---|
page["Reservations"] | EC2 groups instances into Reservations. A Reservation is one launch request that may have launched multiple instances |
reservation["Instances"] | Each Reservation contains 1 or more Instance dicts |
instance["InstanceId"] | The unique ID of this EC2 instance (e.g., i-0abc123def456789) |
instance_name = next(
(tag["Value"] for tag in instance.get("Tags", []) if tag["Key"] == "Name"),
instance_id,
)
| Line | Explanation |
|---|
instance.get("Tags", []) | Returns [] if the instance has no tags — avoids KeyError |
tag["Key"] == "Name" | Tags are stored as a list of dicts. We search for the tag whose Key is “Name” |
next(..., instance_id) | Returns the first match. Falls back to the InstanceId string if there’s no Name tag |
for mapping in instance.get("BlockDeviceMappings", []):
volumes.append({
"volume_id": mapping["Ebs"]["VolumeId"],
"device": mapping["DeviceName"],
})
| Line | Explanation |
|---|
BlockDeviceMappings | Lists all EBS volumes attached to this instance. Each entry has DeviceName (e.g., /dev/xvda) and Ebs.VolumeId |
mapping["Ebs"]["VolumeId"] | The EBS Volume ID (e.g., vol-0abc123). This is what create_snapshot() needs |
mapping["DeviceName"] | The device path inside the OS (e.g., /dev/xvda for root, /dev/xvdb for data). Used in the snapshot description for human readability |
create_snapshots(volumes)
response = self.ec2.create_snapshot(
VolumeId=vol["volume_id"],
Description=f"Auto-backup {vol['instance_name']} {vol['device']} {timestamp}",
TagSpecifications=[{
"ResourceType": "snapshot",
"Tags": [
{"Key": "AutoBackup", "Value": "true"},
...
],
}],
)
| Line | Explanation |
|---|
create_snapshot(VolumeId=...) | Initiates an EBS snapshot. Snapshots are asynchronous — the API returns immediately with a snap-xxx ID while the actual data copy continues in the background |
Description=... | A human-readable label stored with the snapshot. Not used programmatically — just for humans browsing the console |
TagSpecifications=[{"ResourceType": "snapshot", "Tags": [...]}] | Tags the snapshot at creation time in one atomic call. ResourceType: "snapshot" tells EC2 these tags belong to the snapshot, not the source volume |
{"Key": "AutoBackup", "Value": "true"} | This tag is the filter key used by the cleanup function. Only snapshots with this tag will ever be auto-deleted |
response["SnapshotId"] | The new snapshot’s ID (e.g., snap-0abc123def456789). Used for logging and returned to the caller |
delete_old_snapshots()
cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days)
| Line | Explanation |
|---|
datetime.now(timezone.utc) | Current time as a timezone-aware UTC datetime. This is critical — if you use datetime.utcnow() (naive), subtracting it from snapshot["StartTime"] (timezone-aware) raises TypeError |
timedelta(days=self.retention_days) | A duration object. Subtracting 30 days from “now” gives the cutoff date. Any snapshot created before this date is expired |
paginator = self.ec2.get_paginator("describe_snapshots")
for page in paginator.paginate(
Filters=[{"Name": "tag:AutoBackup", "Values": ["true"]}],
OwnerIds=["self"],
):
| Line | Explanation |
|---|
get_paginator("describe_snapshots") | Paginates through all snapshots. An account can have thousands of snapshots — pagination ensures none are missed |
Filters=[{"Name": "tag:AutoBackup", "Values": ["true"]}] | Only returns snapshots tagged AutoBackup=true. This is the safety fence — we never accidentally delete manually-created snapshots |
OwnerIds=["self"] | Only shows snapshots owned by this AWS account. Without this, public AWS snapshots (used for AMIs) could appear and be accidentally deleted |
if snapshot["StartTime"] < cutoff:
self.ec2.delete_snapshot(SnapshotId=snapshot["SnapshotId"])
| Line | Explanation |
|---|
snapshot["StartTime"] | A timezone-aware datetime object (UTC) representing when the snapshot started. boto3 automatically converts the API’s ISO 8601 string to a Python datetime |
< cutoff | Compares two timezone-aware datetimes. If the snapshot is older than the cutoff, it’s expired |
delete_snapshot(SnapshotId=...) | Permanently and irreversibly deletes the snapshot and frees the storage. This cannot be undone |
run() — Orchestrator
volumes = self.get_running_instance_volumes()
created = self.create_snapshots(volumes)
deleted = self.delete_old_snapshots()
| Line | Explanation |
|---|
| Order matters | We create new snapshots before deleting old ones. This ensures we never have a window with fewer-than-retention snapshots |
Returns a dict | {"volumes_found": N, "snapshots_created": M, "snapshots_deleted": K, ...} — useful for Lambda return values and CloudWatch metric publishing |
Lambda & Local Entry Points
def lambda_handler(event, context):
manager = EBSSnapshotManager(retention_days=30)
return manager.run()
if __name__ == "__main__":
manager = EBSSnapshotManager(region="ap-south-1", retention_days=30)
| Line | Explanation |
|---|
lambda_handler(event, context) | AWS Lambda calls this function when triggered. event contains EventBridge payload (mostly unused here). context has Lambda metadata (timeout remaining, etc.) |
if __name__ == "__main__": | Python runs this block only when the script is executed directly (not when imported). This lets the same file work both as a module and as a runnable script |