Scenario Advanced Python Python AWS Scripting

RDS Snapshot Before Deployment & Point-in-Time Restore

Python script to create an RDS snapshot before every deployment and restore the database from that snapshot (or to a specific point in time) if a rollback is required.

January 20, 2025 9 min read ~25 min to complete DB
The Situation

Database safety net — never deploy without a pre-deployment snapshot so you can roll back the database to match a code rollback within minutes.

7 Steps
3 Services Used
~25 min Duration
Advanced Difficulty

Problem Statement

Your team deployed a schema migration that introduced a breaking change. The code rollback was instant, but the database was already migrated — no snapshot meant a 4-hour manual recovery. A pre-deployment snapshot takes 5-10 minutes and costs only storage. This script automates it as part of your deployment pipeline.


Deployment Integration Pattern

# In your CI/CD pipeline (GitHub Actions, Jenkins, etc.)
python rds_snapshot.py pre-deploy --db prod-postgres --version v2.4.1

# Run your deployment...

# If deployment fails:
python rds_snapshot.py restore --snapshot pre-deploy-prod-postgres-v2.4.1-20250120-103045

Complete Script

import boto3
import time
import sys
from datetime import datetime, timezone


class RDSSnapshotManager:
    def __init__(self, region: str = "us-east-1"):
        """
        boto3.client("rds") is the RDS service client.
        RDS operations are long-running (snapshot: 5-30 min,
        restore: 10-45 min) — we use boto3 Waiters to poll for completion
        instead of writing manual sleep loops.
        """
        self.rds = boto3.client("rds", region_name=region)

    # ── Pre-deployment snapshot ───────────────────────────────────
    def create_pre_deploy_snapshot(
        self, db_identifier: str, deploy_version: str
    ) -> str:
        """
        create_db_snapshot() creates a manual snapshot of the RDS instance.
        Manual snapshots are retained until you explicitly delete them
        (unlike automated backups which expire based on the retention window).

        DBSnapshotIdentifier must be unique, 1-255 chars, alphanumeric + hyphens.
        We embed the instance name + deploy version + timestamp for traceability.

        TagSpecifications on RDS use a different pattern than EC2 — tags
        are passed directly in the API call.
        """
        timestamp   = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
        snapshot_id = f"pre-deploy-{db_identifier}-{deploy_version}-{timestamp}"

        print(f"Creating pre-deployment snapshot: {snapshot_id}")

        self.rds.create_db_snapshot(
            DBSnapshotIdentifier=snapshot_id,
            DBInstanceIdentifier=db_identifier,
            Tags=[
                {"Key": "Type",          "Value": "pre-deployment"},
                {"Key": "DeployVersion", "Value": deploy_version},
                {"Key": "CreatedAt",     "Value": timestamp},
                {"Key": "AutoDelete",    "Value": "false"},   # Keep until manually removed
            ],
        )

        # ── Wait for snapshot to be available ─────────────────────
        # get_waiter("db_snapshot_available") polls describe_db_snapshots()
        # every 30 seconds (default) until Status = "available".
        # Raises WaiterError if it doesn't complete within max_attempts.
        print("Waiting for snapshot to complete (5-20 min for large databases)...")
        waiter = self.rds.get_waiter("db_snapshot_available")
        waiter.wait(
            DBSnapshotIdentifier=snapshot_id,
            WaiterConfig={
                "Delay":       30,    # Poll every 30 seconds
                "MaxAttempts": 60,    # Give up after 30 min (60 × 30s)
            },
        )
        print(f"✅ Snapshot ready: {snapshot_id}")
        return snapshot_id

    # ── Restore from snapshot ─────────────────────────────────────
    def restore_from_snapshot(
        self,
        snapshot_id: str,
        new_db_identifier: str,
        db_subnet_group: str,
        vpc_sg_ids: list[str],
    ) -> str:
        """
        restore_db_instance_from_db_snapshot() creates a NEW RDS instance
        from the snapshot. It does NOT overwrite the existing database.

        This is intentional — you end up with TWO databases:
          prod-postgres         → current (potentially broken) DB
          prod-postgres-rollback → restored from pre-deploy snapshot

        After verifying the restore, you either:
          a) Swap the app config to point to the restored DB, OR
          b) Use pg_dump/restore to merge specific tables back

        We read DBInstanceClass from the snapshot so the restored instance
        matches the original instance type.
        """
        print(f"Restoring {new_db_identifier} from snapshot: {snapshot_id}")

        # Get the original instance class from the snapshot
        snapshots = self.rds.describe_db_snapshots(
            DBSnapshotIdentifier=snapshot_id
        )["DBSnapshots"]
        if not snapshots:
            raise ValueError(f"Snapshot not found: {snapshot_id}")
        snapshot = snapshots[0]

        self.rds.restore_db_instance_from_db_snapshot(
            DBInstanceIdentifier=new_db_identifier,
            DBSnapshotIdentifier=snapshot_id,
            # Use the same instance class as the original
            DBInstanceClass=snapshot["DBInstanceClass"],
            DBSubnetGroupName=db_subnet_group,
            VpcSecurityGroupIds=vpc_sg_ids,
            MultiAZ=True,
            AutoMinorVersionUpgrade=True,
            DeletionProtection=True,    # Prevent accidental deletion
            Tags=[
                {"Key": "RestoredFrom", "Value": snapshot_id},
                {"Key": "RestoredAt",   "Value": datetime.utcnow().isoformat() + "Z"},
                {"Key": "Temporary",    "Value": "true"},
            ],
        )

        print("Waiting for restored instance to be available (10-45 min)...")
        waiter = self.rds.get_waiter("db_instance_available")
        waiter.wait(
            DBInstanceIdentifier=new_db_identifier,
            WaiterConfig={"Delay": 30, "MaxAttempts": 120},   # Up to 60 min
        )

        endpoint = self.rds.describe_db_instances(
            DBInstanceIdentifier=new_db_identifier
        )["DBInstances"][0]["Endpoint"]["Address"]

        print(f"✅ Restored instance available at: {endpoint}")
        return endpoint

    # ── Point-in-time restore ─────────────────────────────────────
    def point_in_time_restore(
        self,
        source_db: str,
        target_db: str,
        restore_time: datetime,
        db_subnet_group: str,
        vpc_sg_ids: list[str],
    ) -> str:
        """
        restore_db_instance_to_point_in_time() restores to any second
        within the automated backup retention window (default 7 days).

        restore_time must be a timezone-aware datetime in UTC.
        Useful when you know exactly when the problem was introduced
        (e.g., "restore to 2025-01-20 14:29:59 UTC — 1 minute before deploy").

        Unlike snapshot restore, this uses the continuous transaction logs
        stored during automated backups to achieve precise RPO.
        """
        print(
            f"Restoring {source_db} to point-in-time: "
            f"{restore_time.strftime('%Y-%m-%d %H:%M:%S UTC')}"
        )

        self.rds.restore_db_instance_to_point_in_time(
            SourceDBInstanceIdentifier=source_db,
            TargetDBInstanceIdentifier=target_db,
            RestoreTime=restore_time,
            DBSubnetGroupName=db_subnet_group,
            VpcSecurityGroupIds=vpc_sg_ids,
            MultiAZ=True,
            DeletionProtection=True,
        )

        waiter = self.rds.get_waiter("db_instance_available")
        waiter.wait(
            DBInstanceIdentifier=target_db,
            WaiterConfig={"Delay": 30, "MaxAttempts": 120},
        )
        print(f"✅ Point-in-time restore complete: {target_db}")

        endpoint = self.rds.describe_db_instances(
            DBInstanceIdentifier=target_db
        )["DBInstances"][0]["Endpoint"]["Address"]
        return endpoint

    # ── List pre-deploy snapshots ─────────────────────────────────
    def list_pre_deploy_snapshots(self, db_identifier: str) -> list[dict]:
        """List all pre-deployment snapshots for a given DB, newest first."""
        paginator = self.rds.get_paginator("describe_db_snapshots")
        snapshots = []
        for page in paginator.paginate(
            DBInstanceIdentifier=db_identifier,
            SnapshotType="manual",
        ):
            for snap in page["DBSnapshots"]:
                if snap["DBSnapshotIdentifier"].startswith("pre-deploy-"):
                    snapshots.append({
                        "id":      snap["DBSnapshotIdentifier"],
                        "status":  snap["Status"],
                        "created": snap["SnapshotCreateTime"].strftime("%Y-%m-%d %H:%M UTC"),
                        "size_gb": snap.get("AllocatedStorage", 0),
                    })

        return sorted(snapshots, key=lambda x: x["created"], reverse=True)


# ── CLI entry point ───────────────────────────────────────────────
if __name__ == "__main__":
    manager = RDSSnapshotManager(region="ap-south-1")

    # Before deployment — called from CI/CD
    snapshot_id = manager.create_pre_deploy_snapshot(
        db_identifier="prod-postgres",
        deploy_version="v2.4.1",
    )
    print(f"\nSave this snapshot ID for rollback: {snapshot_id}")

    # List recent snapshots
    print("\nRecent pre-deploy snapshots:")
    for snap in manager.list_pre_deploy_snapshots("prod-postgres")[:5]:
        print(f"  {snap['id']}  [{snap['status']}]  {snap['created']}")

    # Rollback (uncomment if deployment fails):
    # manager.restore_from_snapshot(
    #     snapshot_id="pre-deploy-prod-postgres-v2.4.1-20250120-103045",
    #     new_db_identifier="prod-postgres-rollback",
    #     db_subnet_group="prod-db-subnet-group",
    #     vpc_sg_ids=["sg-0abc1234"],
    # )

    # Point-in-time restore (uncomment if needed):
    # from datetime import timezone
    # restore_to = datetime(2025, 1, 20, 14, 29, 59, tzinfo=timezone.utc)
    # manager.point_in_time_restore(
    #     source_db="prod-postgres",
    #     target_db="prod-postgres-pitr",
    #     restore_time=restore_to,
    #     db_subnet_group="prod-db-subnet-group",
    #     vpc_sg_ids=["sg-0abc1234"],
    # )

Key Commands Explained

CommandWhat it does
create_db_snapshot(DBSnapshotIdentifier, DBInstanceIdentifier)Creates a manual RDS snapshot
get_waiter("db_snapshot_available")Polls until snapshot Status = “available”
waiter.wait(WaiterConfig={"Delay": 30, "MaxAttempts": 60})Override poll interval and max attempts
describe_db_snapshots(DBSnapshotIdentifier)["DBSnapshots"][0]Get metadata of a specific snapshot
restore_db_instance_from_db_snapshot(...)Creates a new DB instance from a snapshot
restore_db_instance_to_point_in_time(RestoreTime=...)Restores to a specific UTC second
get_waiter("db_instance_available")Polls until DB instance Status = “available”
describe_db_instances()["DBInstances"][0]["Endpoint"]["Address"]Get the restored DB hostname

Common Issues

SnapshotQuotaExceeded — AWS allows up to 100 manual snapshots per account per region. Clean up old pre-deploy snapshots after a successful deployment.

Restore creates new endpoint — The restored instance has a NEW hostname. You must update your app config / connection string to point to it.

Point-in-time restore outside retention window — If your automated backup retention is 7 days, you can only restore to within the last 7 days. Increase the retention period in RDS settings for a wider recovery window.


🔍 Line-by-Line Code Walkthrough

Imports

LineWhy It’s Used
import boto3AWS SDK — provides the RDS client
import timeStandard library — could be used for manual sleep loops (we use Waiters instead)
import sysUsed for sys.argv in CLI mode
from datetime import datetime, timezonedatetime.now(timezone.utc) for UTC-aware timestamps. timezone.utc makes the datetime timezone-aware — required for RestoreTime in point-in-time restore

RDSSnapshotManager.__init__

self.rds = boto3.client("rds", region_name=region)
LineExplanation
boto3.client("rds", region_name=region)Creates an RDS API client. RDS is regional — specify the region where your database lives
Why store as self.rds?Shared across all methods so we don’t create a new client on every call

create_pre_deploy_snapshot(db_identifier, deploy_version)

timestamp   = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
snapshot_id = f"pre-deploy-{db_identifier}-{deploy_version}-{timestamp}"
LineExplanation
datetime.now(timezone.utc)Current UTC time — timezone-aware. strftime converts it to a string
"%Y%m%d-%H%M%S"Format: 20250120-103045. Used in the snapshot ID for uniqueness and easy sorting
f"pre-deploy-{db_identifier}-..."Snapshot IDs must be: 1–255 characters, alphanumeric + hyphens, unique per account+region. Embedding the DB name and version makes it self-documenting
self.rds.create_db_snapshot(
    DBSnapshotIdentifier=snapshot_id,
    DBInstanceIdentifier=db_identifier,
    Tags=[
        {"Key": "Type",          "Value": "pre-deployment"},
        {"Key": "DeployVersion", "Value": deploy_version},
    ],
)
LineExplanation
DBSnapshotIdentifier=snapshot_idThe unique name for this snapshot. Used to reference it later in restore or describe calls
DBInstanceIdentifier=db_identifierThe source RDS instance to snapshot (e.g., prod-postgres)
Tags=[{"Key": "Type", ...}]RDS tags are passed directly in the API call (unlike EC2 which has a separate TagSpecifications key)
"AutoDelete": "false"A custom tag we add — no effect on AWS behavior, but reminds operators this snapshot should not be deleted by automated cleanup scripts
waiter = self.rds.get_waiter("db_snapshot_available")
waiter.wait(
    DBSnapshotIdentifier=snapshot_id,
    WaiterConfig={"Delay": 30, "MaxAttempts": 60},
)
LineExplanation
get_waiter("db_snapshot_available")Returns a pre-built Waiter that polls describe_db_snapshots() until Status == "available"
WaiterConfig.Delay: 30Polls every 30 seconds — boto3’s default is also 30 seconds for this waiter
WaiterConfig.MaxAttempts: 60Tries 60 times × 30 seconds = 30 minutes maximum wait. A large database (2 TB) can take this long
Why use Waiter instead of time.sleep?Waiters know the exact API to poll and the terminal conditions (available, failed). A manual loop would need to replicate this logic

restore_from_snapshot(...)

snapshots = self.rds.describe_db_snapshots(
    DBSnapshotIdentifier=snapshot_id
)["DBSnapshots"]
snapshot = snapshots[0]
LineExplanation
describe_db_snapshots(DBSnapshotIdentifier=snapshot_id)Retrieves metadata about this specific snapshot — including the DBInstanceClass (instance type) of the original database
["DBSnapshots"]The list of matching snapshots. Even with a specific ID, the API returns a list
snapshots[0]Takes the first (and only) result. We check if not snapshots above to handle the not-found case
self.rds.restore_db_instance_from_db_snapshot(
    DBInstanceIdentifier=new_db_identifier,
    DBSnapshotIdentifier=snapshot_id,
    DBInstanceClass=snapshot["DBInstanceClass"],
    DBSubnetGroupName=db_subnet_group,
    VpcSecurityGroupIds=vpc_sg_ids,
    MultiAZ=True,
    DeletionProtection=True,
)
LineExplanation
DBInstanceIdentifier=new_db_identifierThe name of the new instance. Restore always creates a NEW instance — it does NOT overwrite the existing database
DBSnapshotIdentifier=snapshot_idWhich snapshot to restore from
DBInstanceClass=snapshot["DBInstanceClass"]Reuses the same instance type (e.g., db.t3.medium) as the original. Ensures the restored DB has the same capacity
DBSubnetGroupName=db_subnet_groupThe subnet group determines which VPC subnets the DB can use. Must exist in your VPC
VpcSecurityGroupIds=vpc_sg_idsThe security groups that control network access. Must allow connections from your application
MultiAZ=TrueCreates a standby replica in another AZ for high availability
DeletionProtection=TruePrevents accidental deletion via CLI or API. Must be explicitly disabled before deleting
endpoint = self.rds.describe_db_instances(
    DBInstanceIdentifier=new_db_identifier
)["DBInstances"][0]["Endpoint"]["Address"]
LineExplanation
describe_db_instances(DBInstanceIdentifier=...)Fetches the instance details after the waiter confirms it’s available
["DBInstances"][0]Returns a list — we take the first (only) result
["Endpoint"]["Address"]The database hostname (e.g., prod-postgres-rollback.abc123.us-east-1.rds.amazonaws.com). This is what goes in your connection string

point_in_time_restore(source_db, target_db, restore_time, ...)

self.rds.restore_db_instance_to_point_in_time(
    SourceDBInstanceIdentifier=source_db,
    TargetDBInstanceIdentifier=target_db,
    RestoreTime=restore_time,
    ...
)
LineExplanation
SourceDBInstanceIdentifier=source_dbThe existing database to restore FROM (not the snapshot — PITR works from automated backups stored in the automated backup retention window)
TargetDBInstanceIdentifier=target_dbName of the new instance to create
RestoreTime=restore_timeA timezone-aware UTC datetime specifying the exact second to restore to. RDS uses transaction logs to replay changes up to this second. The datetime must be within the automated backup retention window
Why PITR vs snapshot?PITR lets you restore to ANY second within your retention window. A pre-deploy snapshot only covers the moment you explicitly took it

list_pre_deploy_snapshots(db_identifier)

for page in paginator.paginate(
    DBInstanceIdentifier=db_identifier,
    SnapshotType="manual",
):
    for snap in page["DBSnapshots"]:
        if snap["DBSnapshotIdentifier"].startswith("pre-deploy-"):
LineExplanation
SnapshotType="manual"Filters to manually created snapshots only. Excludes automated backups (which have SnapshotType="automated")
.startswith("pre-deploy-")Secondary filter to only return snapshots created by this script (vs other manual snapshots the team might have created)
sorted(..., key=lambda x: x["created"], reverse=True)Returns snapshots newest-first so the most recent rollback option is at index 0
Services Used
RDSboto3IAM
Prerequisites
  • Python 3.8+
  • boto3
  • IAM: rds:CreateDBSnapshot, rds:DescribeDBSnapshots, rds:RestoreDBInstanceFromDBSnapshot, rds:RestoreDBInstanceToPointInTime
What You Learned
  • RDS snapshot lifecycle
  • RDS waiters for async operations
  • Point-in-time restore
  • Pre-deployment automation pattern

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios