The Situation
Security automation — enforcing the AWS CIS Benchmark control that access keys must be rotated every 90 days to reduce credential exposure risk.
Problem Statement
CIS AWS Benchmark 1.14 requires IAM access keys to be rotated every 90 days. Long-lived keys are a top attack vector — if leaked, an attacker has months of access. Manual rotation across 30+ users is slow and inconsistent. This script automates discovery, rotation, and notification.
Safe Rotation Pattern
Step 1 → Create new key (user now has 2 active keys — both work)
Step 2 → Notify user (give them time to update their config)
Step 3 → Deactivate old key (wait 7 days — old key stops working)
Step 4 → Delete old key (run a cleanup pass 7 days later)
Never delete an old key immediately — the user may be using it in a CI/CD pipeline that hasn’t been updated yet.
Complete Script
import boto3
from datetime import datetime, timezone, timedelta
import logging
import json
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
def rotate_old_access_keys(
dry_run: bool = True,
max_age_days: int = 90,
region: str = "us-east-1",
) -> list[dict]:
"""
Scans all IAM users and rotates access keys older than max_age_days.
dry_run=True (default) — prints what would happen but makes no changes.
Always test with dry_run=True before running against production.
"""
iam = boto3.client("iam")
ses = boto3.client("ses", region_name=region)
sm = boto3.client("secretsmanager", region_name=region)
report = []
# ── List all IAM users (paginated) ────────────────────────────
# list_users() is paginated — get_paginator handles NextToken automatically.
# IAM is a global service (no region needed) but the client still works.
users = []
paginator = iam.get_paginator("list_users")
for page in paginator.paginate():
users.extend(page["Users"])
logger.info(f"Found {len(users)} IAM users to check")
threshold = datetime.now(timezone.utc) - timedelta(days=max_age_days)
for user in users:
username = user["UserName"]
# ── List access keys for this user ─────────────────────────
# list_access_keys() returns AccessKeyMetadata — the list of
# key IDs, statuses, and creation dates. It does NOT return
# the secret — secrets are shown only at creation time.
keys = iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
for key in keys:
if key["Status"] != "Active":
continue # Skip already-inactive keys
key_age = datetime.now(timezone.utc) - key["CreateDate"]
if key["CreateDate"] < threshold:
logger.info(
f"Key {key['AccessKeyId']} for {username} is "
f"{key_age.days} days old (> {max_age_days}) → rotating"
)
action_taken = "DRY_RUN"
if not dry_run:
action_taken = _rotate_key(iam, ses, sm, username, key)
report.append({
"username": username,
"old_key_id": key["AccessKeyId"],
"key_age_days": key_age.days,
"action": action_taken,
})
# Summary
logger.info(f"Rotation complete. {len(report)} key(s) processed.")
return report
def _rotate_key(iam, ses, sm, username: str, old_key: dict) -> str:
"""
Performs the actual key rotation:
1. Create new key (AWS allows max 2 active keys per user)
2. Store new secret in Secrets Manager (NOT in email)
3. Deactivate old key (don't delete yet)
4. Notify user via SES
"""
old_key_id = old_key["AccessKeyId"]
try:
# Step 1 — Create new key
# create_access_key() is the ONLY time the SecretAccessKey is visible.
# Store it immediately in Secrets Manager — it cannot be retrieved later.
new_key_response = iam.create_access_key(UserName=username)
new_key = new_key_response["AccessKey"]
new_key_id = new_key["AccessKeyId"]
new_key_secret = new_key["SecretAccessKey"]
logger.info(f" Created new key {new_key_id} for {username}")
# Step 2 — Store in Secrets Manager (never send raw secret in email)
secret_name = f"/iam/keys/{username}"
secret_value = json.dumps({
"AccessKeyId": new_key_id,
"SecretAccessKey": new_key_secret,
"RotatedAt": datetime.utcnow().isoformat() + "Z",
"OldKeyId": old_key_id,
})
try:
# update_secret if it already exists
sm.put_secret_value(SecretId=secret_name, SecretString=secret_value)
except sm.exceptions.ResourceNotFoundException:
# create_secret on first rotation for this user
sm.create_secret(Name=secret_name, SecretString=secret_value)
logger.info(f" New credentials stored at Secrets Manager: {secret_name}")
# Step 3 — Deactivate (not delete) the old key
# update_access_key() with Status="Inactive" disables the key
# without permanently removing it — giving a grace period to update.
iam.update_access_key(
UserName=username,
AccessKeyId=old_key_id,
Status="Inactive",
)
logger.info(f" Deactivated old key {old_key_id} for {username}")
# Step 4 — Notify user via SES
_notify_user(ses, username, old_key_id, new_key_id, secret_name)
return "ROTATED"
except Exception as e:
logger.error(f"Failed to rotate key for {username}: {e}")
return f"FAILED: {e}"
def _notify_user(ses, username: str, old_key_id: str, new_key_id: str, secret_name: str) -> None:
"""
Send the user a notification. We send the SECRET MANAGER PATH,
not the raw secret, so it never appears in email logs or archives.
"""
ses.send_email(
Source="[email protected]",
Destination={"ToAddresses": [f"{username}@company.com"]},
Message={
"Subject": {"Data": "🔑 Your AWS Access Key Has Been Rotated"},
"Body": {
"Text": {
"Data": f"""
Hi {username},
Your AWS access key {old_key_id} was {90}+ days old and has been rotated
as per our security policy (CIS AWS Benchmark 1.14).
Your new Access Key ID: {new_key_id}
Your new Secret: Stored in AWS Secrets Manager at {secret_name}
Steps to update your credentials:
1. aws secretsmanager get-secret-value --secret-id {secret_name}
2. Update ~/.aws/credentials with the new key
3. Update any CI/CD pipelines that use the old key
4. The old key ({old_key_id}) will be DELETED in 7 days
Need help? Contact [email protected]
"""
}
},
},
)
def delete_inactive_keys(max_age_days: int = 7, dry_run: bool = True) -> None:
"""
Second-pass script: run 7 days after rotation to delete the old inactive keys.
delete_access_key() permanently and irreversibly removes the key.
"""
iam = boto3.client("iam")
threshold = datetime.now(timezone.utc) - timedelta(days=max_age_days)
paginator = iam.get_paginator("list_users")
for page in paginator.paginate():
for user in page["Users"]:
username = user["UserName"]
keys = iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
for key in keys:
if key["Status"] == "Inactive" and key["CreateDate"] < threshold:
if dry_run:
print(f"[DRY-RUN] Would delete {key['AccessKeyId']} for {username}")
else:
iam.delete_access_key(
UserName=username, AccessKeyId=key["AccessKeyId"]
)
logger.info(f"Deleted inactive key {key['AccessKeyId']} for {username}")
if __name__ == "__main__":
# Phase 1 — Preview
report = rotate_old_access_keys(dry_run=True, max_age_days=90)
print(f"\nDry-run complete: {len(report)} key(s) would be rotated")
# Phase 2 — Apply (uncomment when ready)
# rotate_old_access_keys(dry_run=False, max_age_days=90)
# Phase 3 — Delete old inactive keys (run 7 days after Phase 2)
# delete_inactive_keys(max_age_days=7, dry_run=False)
Key Commands Explained
| Command | What it does |
|---|
get_paginator("list_users") | Paginates through all IAM users (default page size: 100) |
list_access_keys(UserName=username) | Returns key metadata — NOT the secret |
key["CreateDate"] | Timezone-aware UTC datetime of key creation |
create_access_key(UserName=username) | Creates a new key — secret is visible ONLY here |
new_key["SecretAccessKey"] | The raw secret — store immediately, never log |
sm.put_secret_value(SecretId=..., SecretString=...) | Stores/updates a secret in Secrets Manager |
update_access_key(UserName, AccessKeyId, Status="Inactive") | Disables a key without deleting it |
delete_access_key(UserName, AccessKeyId) | Permanently deletes a key — irreversible |
Common Issues
LimitExceeded on CreateAccessKey — Each IAM user can have at most 2 access keys. If the user already has 2 active keys, you must delete one before creating a new one. Check and handle this edge case.
NoSuchEntity on send_email — The email address must be verified in SES. In sandbox mode, both sender and recipient need verification.
Key used in multiple places — Before rotating, consider querying CloudTrail to see which services and IPs last used the old key. This helps identify all places that need updating.
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|
import boto3 | AWS SDK — needed for IAM, SES, and Secrets Manager clients |
from datetime import datetime, timezone, timedelta | Compute key age by subtracting key["CreateDate"] (UTC-aware) from now |
import logging | Structured log output with timestamps |
import json | Serialize the new credentials dict into a JSON string for Secrets Manager |
rotate_old_access_keys(dry_run, max_age_days, region)
iam = boto3.client("iam")
ses = boto3.client("ses", region_name=region)
sm = boto3.client("secretsmanager", region_name=region)
| Line | Explanation |
|---|
boto3.client("iam") | IAM is a global service — no region_name is needed. The same IAM users and roles exist in all regions |
boto3.client("ses", region_name=region) | SES is regional. Email identities must be verified in the same region as this client |
boto3.client("secretsmanager", region_name=region) | Secrets Manager is regional. We store new key credentials here — the script and the IAM user must be in the same region |
paginator = iam.get_paginator("list_users")
for page in paginator.paginate():
users.extend(page["Users"])
| Line | Explanation |
|---|
get_paginator("list_users") | list_users() returns a max of 100 users per call. The paginator handles IsTruncated / Marker tokens automatically |
paginator.paginate() | Iterates all pages. Each page["Users"] is a list of user dicts |
users.extend(page["Users"]) | Appends users from this page to the main list. Unlike append, extend adds each user individually (not as a nested list) |
threshold = datetime.now(timezone.utc) - timedelta(days=max_age_days)
| Line | Explanation |
|---|
datetime.now(timezone.utc) | Current UTC time as a timezone-aware datetime — required to compare with key["CreateDate"] which is also UTC-aware |
timedelta(days=max_age_days) | Subtracted from now to get the cutoff date. A key created before this date is too old |
keys = iam.list_access_keys(UserName=username)["AccessKeyMetadata"]
| Line | Explanation |
|---|
list_access_keys(UserName=username) | Returns metadata about all access keys for this user: ID, status, and creation date. Never returns the secret — AWS only shows the secret once at creation time |
["AccessKeyMetadata"] | The list of key metadata dicts. Each has AccessKeyId, Status ("Active" or "Inactive"), and CreateDate |
if key["Status"] != "Active":
continue
| Line | Explanation |
|---|
key["Status"] != "Active" | We skip Inactive keys — they’re either already rotated or being phased out. We only rotate currently-active keys |
continue | Skips to the next key in the loop without executing the rotation logic |
key_age = datetime.now(timezone.utc) - key["CreateDate"]
if key["CreateDate"] < threshold:
| Line | Explanation |
|---|
key["CreateDate"] | A timezone-aware UTC datetime of when this key was created. boto3 parses the API’s ISO 8601 string automatically |
datetime.now(timezone.utc) - key["CreateDate"] | Produces a timedelta object. .days gives the key age in whole days |
key["CreateDate"] < threshold | “Was this key created before 90 days ago?” If yes, the key is expired |
_rotate_key(iam, ses, sm, username, old_key) — Step by Step
new_key_response = iam.create_access_key(UserName=username)
new_key = new_key_response["AccessKey"]
new_key_id = new_key["AccessKeyId"]
new_key_secret = new_key["SecretAccessKey"]
| Line | Explanation |
|---|
create_access_key(UserName=username) | Creates a new IAM access key pair. Each IAM user can have at most 2 access keys — if the user already has 2 active keys, this raises LimitExceeded |
new_key_response["AccessKey"] | The response contains the full key including the secret |
new_key["AccessKeyId"] | The public key ID (e.g., AKIAIOSFODNN7EXAMPLE). This is what goes into ~/.aws/credentials |
new_key["SecretAccessKey"] | The private secret. This is the ONLY time AWS shows this value. If you don’t save it now, you must create a new key |
secret_value = json.dumps({
"AccessKeyId": new_key_id,
"SecretAccessKey": new_key_secret,
"RotatedAt": datetime.utcnow().isoformat() + "Z",
"OldKeyId": old_key_id,
})
sm.put_secret_value(SecretId=secret_name, SecretString=secret_value)
| Line | Explanation |
|---|
json.dumps({...}) | Secrets Manager stores strings — we serialize the credentials dict to a JSON string |
datetime.utcnow().isoformat() + "Z" | ISO 8601 timestamp with Z suffix indicating UTC. Used for audit purposes |
sm.put_secret_value(SecretId=secret_name, SecretString=secret_value) | Creates or updates the secret. If the secret already exists at this path, it creates a new version |
except ResourceNotFoundException: sm.create_secret(...) | put_secret_value fails if the secret doesn’t exist yet — we catch this and create it on first rotation |
iam.update_access_key(
UserName=username,
AccessKeyId=old_key_id,
Status="Inactive",
)
| Line | Explanation |
|---|
update_access_key(Status="Inactive") | Deactivates the old key — API calls using it will receive InvalidClientTokenId errors. The key record still exists (for 7 days) so the user can retrieve the new key and update their pipelines |
Why not delete_access_key immediately? | CI/CD pipelines may be using the old key. Deactivating gives a grace period. Deleting is irreversible — if something breaks, you can reactivate the old key during the grace period |
delete_inactive_keys(max_age_days, dry_run) — Phase 3
for key in keys:
if key["Status"] == "Inactive" and key["CreateDate"] < threshold:
iam.delete_access_key(UserName=username, AccessKeyId=key["AccessKeyId"])
| Line | Explanation |
|---|
key["Status"] == "Inactive" | Only touches keys that were deactivated (by our Phase 2 script) |
key["CreateDate"] < threshold | With max_age_days=7, this finds keys deactivated more than 7 days ago — old enough that all pipelines should have been updated |
delete_access_key(UserName=username, AccessKeyId=...) | Permanently removes the key. No recovery possible. API calls using this key immediately get InvalidClientTokenId |