RDS Snapshot Before Deployment & Point-in-Time Restore
Python script to create an RDS snapshot before every deployment and restore the database from that snapshot (or to a specific point in time) if a rollback is required.
Database safety net — never deploy without a pre-deployment snapshot so you can roll back the database to match a code rollback within minutes.
Problem Statement
Your team deployed a schema migration that introduced a breaking change. The code rollback was instant, but the database was already migrated — no snapshot meant a 4-hour manual recovery. A pre-deployment snapshot takes 5-10 minutes and costs only storage. This script automates it as part of your deployment pipeline.
Deployment Integration Pattern
# In your CI/CD pipeline (GitHub Actions, Jenkins, etc.)
python rds_snapshot.py pre-deploy --db prod-postgres --version v2.4.1
# Run your deployment...
# If deployment fails:
python rds_snapshot.py restore --snapshot pre-deploy-prod-postgres-v2.4.1-20250120-103045
Complete Script
import boto3
import time
import sys
from datetime import datetime, timezone
class RDSSnapshotManager:
def __init__(self, region: str = "us-east-1"):
"""
boto3.client("rds") is the RDS service client.
RDS operations are long-running (snapshot: 5-30 min,
restore: 10-45 min) — we use boto3 Waiters to poll for completion
instead of writing manual sleep loops.
"""
self.rds = boto3.client("rds", region_name=region)
# ── Pre-deployment snapshot ───────────────────────────────────
def create_pre_deploy_snapshot(
self, db_identifier: str, deploy_version: str
) -> str:
"""
create_db_snapshot() creates a manual snapshot of the RDS instance.
Manual snapshots are retained until you explicitly delete them
(unlike automated backups which expire based on the retention window).
DBSnapshotIdentifier must be unique, 1-255 chars, alphanumeric + hyphens.
We embed the instance name + deploy version + timestamp for traceability.
TagSpecifications on RDS use a different pattern than EC2 — tags
are passed directly in the API call.
"""
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
snapshot_id = f"pre-deploy-{db_identifier}-{deploy_version}-{timestamp}"
print(f"Creating pre-deployment snapshot: {snapshot_id}")
self.rds.create_db_snapshot(
DBSnapshotIdentifier=snapshot_id,
DBInstanceIdentifier=db_identifier,
Tags=[
{"Key": "Type", "Value": "pre-deployment"},
{"Key": "DeployVersion", "Value": deploy_version},
{"Key": "CreatedAt", "Value": timestamp},
{"Key": "AutoDelete", "Value": "false"}, # Keep until manually removed
],
)
# ── Wait for snapshot to be available ─────────────────────
# get_waiter("db_snapshot_available") polls describe_db_snapshots()
# every 30 seconds (default) until Status = "available".
# Raises WaiterError if it doesn't complete within max_attempts.
print("Waiting for snapshot to complete (5-20 min for large databases)...")
waiter = self.rds.get_waiter("db_snapshot_available")
waiter.wait(
DBSnapshotIdentifier=snapshot_id,
WaiterConfig={
"Delay": 30, # Poll every 30 seconds
"MaxAttempts": 60, # Give up after 30 min (60 × 30s)
},
)
print(f"✅ Snapshot ready: {snapshot_id}")
return snapshot_id
# ── Restore from snapshot ─────────────────────────────────────
def restore_from_snapshot(
self,
snapshot_id: str,
new_db_identifier: str,
db_subnet_group: str,
vpc_sg_ids: list[str],
) -> str:
"""
restore_db_instance_from_db_snapshot() creates a NEW RDS instance
from the snapshot. It does NOT overwrite the existing database.
This is intentional — you end up with TWO databases:
prod-postgres → current (potentially broken) DB
prod-postgres-rollback → restored from pre-deploy snapshot
After verifying the restore, you either:
a) Swap the app config to point to the restored DB, OR
b) Use pg_dump/restore to merge specific tables back
We read DBInstanceClass from the snapshot so the restored instance
matches the original instance type.
"""
print(f"Restoring {new_db_identifier} from snapshot: {snapshot_id}")
# Get the original instance class from the snapshot
snapshots = self.rds.describe_db_snapshots(
DBSnapshotIdentifier=snapshot_id
)["DBSnapshots"]
if not snapshots:
raise ValueError(f"Snapshot not found: {snapshot_id}")
snapshot = snapshots[0]
self.rds.restore_db_instance_from_db_snapshot(
DBInstanceIdentifier=new_db_identifier,
DBSnapshotIdentifier=snapshot_id,
# Use the same instance class as the original
DBInstanceClass=snapshot["DBInstanceClass"],
DBSubnetGroupName=db_subnet_group,
VpcSecurityGroupIds=vpc_sg_ids,
MultiAZ=True,
AutoMinorVersionUpgrade=True,
DeletionProtection=True, # Prevent accidental deletion
Tags=[
{"Key": "RestoredFrom", "Value": snapshot_id},
{"Key": "RestoredAt", "Value": datetime.utcnow().isoformat() + "Z"},
{"Key": "Temporary", "Value": "true"},
],
)
print("Waiting for restored instance to be available (10-45 min)...")
waiter = self.rds.get_waiter("db_instance_available")
waiter.wait(
DBInstanceIdentifier=new_db_identifier,
WaiterConfig={"Delay": 30, "MaxAttempts": 120}, # Up to 60 min
)
endpoint = self.rds.describe_db_instances(
DBInstanceIdentifier=new_db_identifier
)["DBInstances"][0]["Endpoint"]["Address"]
print(f"✅ Restored instance available at: {endpoint}")
return endpoint
# ── Point-in-time restore ─────────────────────────────────────
def point_in_time_restore(
self,
source_db: str,
target_db: str,
restore_time: datetime,
db_subnet_group: str,
vpc_sg_ids: list[str],
) -> str:
"""
restore_db_instance_to_point_in_time() restores to any second
within the automated backup retention window (default 7 days).
restore_time must be a timezone-aware datetime in UTC.
Useful when you know exactly when the problem was introduced
(e.g., "restore to 2025-01-20 14:29:59 UTC — 1 minute before deploy").
Unlike snapshot restore, this uses the continuous transaction logs
stored during automated backups to achieve precise RPO.
"""
print(
f"Restoring {source_db} to point-in-time: "
f"{restore_time.strftime('%Y-%m-%d %H:%M:%S UTC')}"
)
self.rds.restore_db_instance_to_point_in_time(
SourceDBInstanceIdentifier=source_db,
TargetDBInstanceIdentifier=target_db,
RestoreTime=restore_time,
DBSubnetGroupName=db_subnet_group,
VpcSecurityGroupIds=vpc_sg_ids,
MultiAZ=True,
DeletionProtection=True,
)
waiter = self.rds.get_waiter("db_instance_available")
waiter.wait(
DBInstanceIdentifier=target_db,
WaiterConfig={"Delay": 30, "MaxAttempts": 120},
)
print(f"✅ Point-in-time restore complete: {target_db}")
endpoint = self.rds.describe_db_instances(
DBInstanceIdentifier=target_db
)["DBInstances"][0]["Endpoint"]["Address"]
return endpoint
# ── List pre-deploy snapshots ─────────────────────────────────
def list_pre_deploy_snapshots(self, db_identifier: str) -> list[dict]:
"""List all pre-deployment snapshots for a given DB, newest first."""
paginator = self.rds.get_paginator("describe_db_snapshots")
snapshots = []
for page in paginator.paginate(
DBInstanceIdentifier=db_identifier,
SnapshotType="manual",
):
for snap in page["DBSnapshots"]:
if snap["DBSnapshotIdentifier"].startswith("pre-deploy-"):
snapshots.append({
"id": snap["DBSnapshotIdentifier"],
"status": snap["Status"],
"created": snap["SnapshotCreateTime"].strftime("%Y-%m-%d %H:%M UTC"),
"size_gb": snap.get("AllocatedStorage", 0),
})
return sorted(snapshots, key=lambda x: x["created"], reverse=True)
# ── CLI entry point ───────────────────────────────────────────────
if __name__ == "__main__":
manager = RDSSnapshotManager(region="ap-south-1")
# Before deployment — called from CI/CD
snapshot_id = manager.create_pre_deploy_snapshot(
db_identifier="prod-postgres",
deploy_version="v2.4.1",
)
print(f"\nSave this snapshot ID for rollback: {snapshot_id}")
# List recent snapshots
print("\nRecent pre-deploy snapshots:")
for snap in manager.list_pre_deploy_snapshots("prod-postgres")[:5]:
print(f" {snap['id']} [{snap['status']}] {snap['created']}")
# Rollback (uncomment if deployment fails):
# manager.restore_from_snapshot(
# snapshot_id="pre-deploy-prod-postgres-v2.4.1-20250120-103045",
# new_db_identifier="prod-postgres-rollback",
# db_subnet_group="prod-db-subnet-group",
# vpc_sg_ids=["sg-0abc1234"],
# )
# Point-in-time restore (uncomment if needed):
# from datetime import timezone
# restore_to = datetime(2025, 1, 20, 14, 29, 59, tzinfo=timezone.utc)
# manager.point_in_time_restore(
# source_db="prod-postgres",
# target_db="prod-postgres-pitr",
# restore_time=restore_to,
# db_subnet_group="prod-db-subnet-group",
# vpc_sg_ids=["sg-0abc1234"],
# )
Key Commands Explained
| Command | What it does |
|---|---|
create_db_snapshot(DBSnapshotIdentifier, DBInstanceIdentifier) | Creates a manual RDS snapshot |
get_waiter("db_snapshot_available") | Polls until snapshot Status = “available” |
waiter.wait(WaiterConfig={"Delay": 30, "MaxAttempts": 60}) | Override poll interval and max attempts |
describe_db_snapshots(DBSnapshotIdentifier)["DBSnapshots"][0] | Get metadata of a specific snapshot |
restore_db_instance_from_db_snapshot(...) | Creates a new DB instance from a snapshot |
restore_db_instance_to_point_in_time(RestoreTime=...) | Restores to a specific UTC second |
get_waiter("db_instance_available") | Polls until DB instance Status = “available” |
describe_db_instances()["DBInstances"][0]["Endpoint"]["Address"] | Get the restored DB hostname |
Common Issues
SnapshotQuotaExceeded — AWS allows up to 100 manual snapshots per account per region. Clean up old pre-deploy snapshots after a successful deployment.
Restore creates new endpoint — The restored instance has a NEW hostname. You must update your app config / connection string to point to it.
Point-in-time restore outside retention window — If your automated backup retention is 7 days, you can only restore to within the last 7 days. Increase the retention period in RDS settings for a wider recovery window.
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|---|
import boto3 | AWS SDK — provides the RDS client |
import time | Standard library — could be used for manual sleep loops (we use Waiters instead) |
import sys | Used for sys.argv in CLI mode |
from datetime import datetime, timezone | datetime.now(timezone.utc) for UTC-aware timestamps. timezone.utc makes the datetime timezone-aware — required for RestoreTime in point-in-time restore |
RDSSnapshotManager.__init__
self.rds = boto3.client("rds", region_name=region)
| Line | Explanation |
|---|---|
boto3.client("rds", region_name=region) | Creates an RDS API client. RDS is regional — specify the region where your database lives |
Why store as self.rds? | Shared across all methods so we don’t create a new client on every call |
create_pre_deploy_snapshot(db_identifier, deploy_version)
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
snapshot_id = f"pre-deploy-{db_identifier}-{deploy_version}-{timestamp}"
| Line | Explanation |
|---|---|
datetime.now(timezone.utc) | Current UTC time — timezone-aware. strftime converts it to a string |
"%Y%m%d-%H%M%S" | Format: 20250120-103045. Used in the snapshot ID for uniqueness and easy sorting |
f"pre-deploy-{db_identifier}-..." | Snapshot IDs must be: 1–255 characters, alphanumeric + hyphens, unique per account+region. Embedding the DB name and version makes it self-documenting |
self.rds.create_db_snapshot(
DBSnapshotIdentifier=snapshot_id,
DBInstanceIdentifier=db_identifier,
Tags=[
{"Key": "Type", "Value": "pre-deployment"},
{"Key": "DeployVersion", "Value": deploy_version},
],
)
| Line | Explanation |
|---|---|
DBSnapshotIdentifier=snapshot_id | The unique name for this snapshot. Used to reference it later in restore or describe calls |
DBInstanceIdentifier=db_identifier | The source RDS instance to snapshot (e.g., prod-postgres) |
Tags=[{"Key": "Type", ...}] | RDS tags are passed directly in the API call (unlike EC2 which has a separate TagSpecifications key) |
"AutoDelete": "false" | A custom tag we add — no effect on AWS behavior, but reminds operators this snapshot should not be deleted by automated cleanup scripts |
waiter = self.rds.get_waiter("db_snapshot_available")
waiter.wait(
DBSnapshotIdentifier=snapshot_id,
WaiterConfig={"Delay": 30, "MaxAttempts": 60},
)
| Line | Explanation |
|---|---|
get_waiter("db_snapshot_available") | Returns a pre-built Waiter that polls describe_db_snapshots() until Status == "available" |
WaiterConfig.Delay: 30 | Polls every 30 seconds — boto3’s default is also 30 seconds for this waiter |
WaiterConfig.MaxAttempts: 60 | Tries 60 times × 30 seconds = 30 minutes maximum wait. A large database (2 TB) can take this long |
Why use Waiter instead of time.sleep? | Waiters know the exact API to poll and the terminal conditions (available, failed). A manual loop would need to replicate this logic |
restore_from_snapshot(...)
snapshots = self.rds.describe_db_snapshots(
DBSnapshotIdentifier=snapshot_id
)["DBSnapshots"]
snapshot = snapshots[0]
| Line | Explanation |
|---|---|
describe_db_snapshots(DBSnapshotIdentifier=snapshot_id) | Retrieves metadata about this specific snapshot — including the DBInstanceClass (instance type) of the original database |
["DBSnapshots"] | The list of matching snapshots. Even with a specific ID, the API returns a list |
snapshots[0] | Takes the first (and only) result. We check if not snapshots above to handle the not-found case |
self.rds.restore_db_instance_from_db_snapshot(
DBInstanceIdentifier=new_db_identifier,
DBSnapshotIdentifier=snapshot_id,
DBInstanceClass=snapshot["DBInstanceClass"],
DBSubnetGroupName=db_subnet_group,
VpcSecurityGroupIds=vpc_sg_ids,
MultiAZ=True,
DeletionProtection=True,
)
| Line | Explanation |
|---|---|
DBInstanceIdentifier=new_db_identifier | The name of the new instance. Restore always creates a NEW instance — it does NOT overwrite the existing database |
DBSnapshotIdentifier=snapshot_id | Which snapshot to restore from |
DBInstanceClass=snapshot["DBInstanceClass"] | Reuses the same instance type (e.g., db.t3.medium) as the original. Ensures the restored DB has the same capacity |
DBSubnetGroupName=db_subnet_group | The subnet group determines which VPC subnets the DB can use. Must exist in your VPC |
VpcSecurityGroupIds=vpc_sg_ids | The security groups that control network access. Must allow connections from your application |
MultiAZ=True | Creates a standby replica in another AZ for high availability |
DeletionProtection=True | Prevents accidental deletion via CLI or API. Must be explicitly disabled before deleting |
endpoint = self.rds.describe_db_instances(
DBInstanceIdentifier=new_db_identifier
)["DBInstances"][0]["Endpoint"]["Address"]
| Line | Explanation |
|---|---|
describe_db_instances(DBInstanceIdentifier=...) | Fetches the instance details after the waiter confirms it’s available |
["DBInstances"][0] | Returns a list — we take the first (only) result |
["Endpoint"]["Address"] | The database hostname (e.g., prod-postgres-rollback.abc123.us-east-1.rds.amazonaws.com). This is what goes in your connection string |
point_in_time_restore(source_db, target_db, restore_time, ...)
self.rds.restore_db_instance_to_point_in_time(
SourceDBInstanceIdentifier=source_db,
TargetDBInstanceIdentifier=target_db,
RestoreTime=restore_time,
...
)
| Line | Explanation |
|---|---|
SourceDBInstanceIdentifier=source_db | The existing database to restore FROM (not the snapshot — PITR works from automated backups stored in the automated backup retention window) |
TargetDBInstanceIdentifier=target_db | Name of the new instance to create |
RestoreTime=restore_time | A timezone-aware UTC datetime specifying the exact second to restore to. RDS uses transaction logs to replay changes up to this second. The datetime must be within the automated backup retention window |
| Why PITR vs snapshot? | PITR lets you restore to ANY second within your retention window. A pre-deploy snapshot only covers the moment you explicitly took it |
list_pre_deploy_snapshots(db_identifier)
for page in paginator.paginate(
DBInstanceIdentifier=db_identifier,
SnapshotType="manual",
):
for snap in page["DBSnapshots"]:
if snap["DBSnapshotIdentifier"].startswith("pre-deploy-"):
| Line | Explanation |
|---|---|
SnapshotType="manual" | Filters to manually created snapshots only. Excludes automated backups (which have SnapshotType="automated") |
.startswith("pre-deploy-") | Secondary filter to only return snapshots created by this script (vs other manual snapshots the team might have created) |
sorted(..., key=lambda x: x["created"], reverse=True) | Returns snapshots newest-first so the most recent rollback option is at index 0 |
- RDS snapshot lifecycle
- RDS waiters for async operations
- Point-in-time restore
- Pre-deployment automation pattern
Have a similar scenario to share?
Production incidents are the best teachers. Submit your real-world scenario and help others learn.
Open Google FormRelated Scenarios
Deploy Lambda Function with Version Publishing & Alias Traffic Shifting
Problem Statement Your team deploys Lambda functions by hand through the AWS Console — which means no audit trail, no rollback plan, and …
EBS Snapshot Manager — Auto Backup & Retention Cleanup
Problem Statement Your team’s EC2 instances hold critical application data on EBS volumes. Without automated snapshots, a failed …
ECS Rolling Deployment with Automatic Rollback on Health Check Failure
Problem Statement Your ECS deployment script just updates the service and walks away. When the new container fails to start (bad image, …