Production-Grade Python Scripts for AWS — Best Practices & Patterns
Code examples covering structured logging, credential management, idempotent operations, environment configuration, Secrets Manager integration, and graceful error handling.
Senior engineer code quality — scripts that pass a production code review: structured logging, no hardcoded secrets, dry-run mode, idempotent operations, and meaningful exit codes.
The 8 Production Best Practices
| # | Practice | Why it matters |
|---|---|---|
| 1 | Structured JSON logging | CloudWatch Logs Insights can query JSON fields |
| 2 | botocore Config with timeouts | Hangs are worse than errors |
| 3 | Credentials via IAM role or Secrets Manager | Hardcoded keys are a breach waiting to happen |
| 4 | Idempotent operations | Script should be safe to run twice |
| 5 | Environment-driven configuration | Same code runs in dev/staging/prod |
| 6 | Dry-run mode | Destructive scripts must be testable safely |
| 7 | Graceful main() with typed exit codes | CI/CD pipelines read exit codes |
| 8 | Input validation at boundaries | Catch bad input before making AWS API calls |
Complete Reference Script
"""
production_aws_script.py
Template for production-grade AWS automation scripts.
Demonstrates all 8 best practices with annotated examples.
"""
import boto3
import logging
import os
import sys
import json
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError, NoCredentialsError
from typing import Optional
from dataclasses import dataclass, field
# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 1: Structured JSON Logging
# ══════════════════════════════════════════════════════════════════
def setup_logger(name: str) -> logging.Logger:
"""
Structured JSON logging is essential for CloudWatch Logs Insights
and log aggregation tools (Datadog, Splunk, ELK).
CloudWatch Logs Insights query example:
fields @timestamp, level, message
| filter level = "ERROR"
| sort @timestamp desc
We write to stdout (not stderr) so logs appear in Lambda/ECS
container logs without mixing with error streams.
"""
logger = logging.getLogger(name)
handler = logging.StreamHandler(sys.stdout)
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
return json.dumps({
"time": self.formatTime(record, "%Y-%m-%dT%H:%M:%SZ"),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
# Include exception info if present
**({"exception": self.formatException(record.exc_info)}
if record.exc_info else {}),
})
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)
# Read log level from env — "INFO" in prod, "DEBUG" in dev
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper())
return logger
logger = setup_logger("aws-automation")
# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 2: botocore Config with Timeouts and Retries
# ══════════════════════════════════════════════════════════════════
BOTO_CONFIG = Config(
# Adaptive retry uses a token bucket algorithm:
# - Token bucket starts full; each retry consumes tokens
# - Tokens regenerate over time
# - Better than "standard" mode for burst throttling scenarios
retries={
"mode": "adaptive",
"max_attempts": 10,
},
# connect_timeout: seconds to wait for TCP connection to AWS API
# read_timeout: seconds to wait for the API response body
# Lower values = faster failure detection for network issues
connect_timeout=5,
read_timeout=30,
# max_pool_connections: max persistent HTTP connections in the pool
# Increase for high-concurrency scripts that make many parallel calls
max_pool_connections=10,
)
# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 3: Credential Management
# ══════════════════════════════════════════════════════════════════
def get_client(
service: str,
region: Optional[str] = None,
role_arn: Optional[str] = None,
) -> boto3.client:
"""
NEVER hardcode credentials. This function provides a safe pattern
for getting a boto3 client with proper credential handling.
Credential resolution order (automatic):
1. role_arn argument → assume cross-account role
2. AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars
3. ~/.aws/credentials file
4. IAM role attached to the EC2/ECS/Lambda (best for production)
If role_arn is provided, we use STS to assume that role first.
This is the standard pattern for cross-account access.
"""
region = region or os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
if role_arn:
sts = boto3.client("sts", config=BOTO_CONFIG)
assumed = sts.assume_role(
RoleArn=role_arn,
RoleSessionName=f"automation-{service}",
)["Credentials"]
return boto3.client(
service,
region_name=region,
aws_access_key_id=assumed["AccessKeyId"],
aws_secret_access_key=assumed["SecretAccessKey"],
aws_session_token=assumed["SessionToken"],
config=BOTO_CONFIG,
)
return boto3.client(service, region_name=region, config=BOTO_CONFIG)
def get_secret(secret_name: str, region: str = "us-east-1") -> dict:
"""
Retrieve a JSON-encoded secret from AWS Secrets Manager.
This replaces hardcoded passwords, API keys, and connection strings.
Secrets Manager also handles rotation automatically.
Usage in code:
db_config = get_secret("prod/postgres/app-user")
conn = psycopg2.connect(
host=db_config["host"],
password=db_config["password"],
)
"""
sm = get_client("secretsmanager", region=region)
try:
response = sm.get_secret_value(SecretId=secret_name)
# SecretString is the JSON string of your secret
return json.loads(response["SecretString"])
except ClientError as e:
error_code = e.response["Error"]["Code"]
if error_code == "ResourceNotFoundException":
raise ValueError(f"Secret not found: {secret_name}") from e
if error_code == "AccessDeniedException":
raise PermissionError(
f"IAM role lacks secretsmanager:GetSecretValue on {secret_name}"
) from e
raise
# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 4: Idempotent Operations
# ══════════════════════════════════════════════════════════════════
def create_s3_bucket_idempotent(bucket_name: str, region: str = "us-east-1") -> bool:
"""
Idempotent: calling this function twice has the same effect as once.
The script is safe to re-run after a failure without creating duplicates.
S3 create_bucket raises BucketAlreadyOwnedByYou if the bucket exists
AND is owned by your account — we treat this as success.
BucketAlreadyExists means another account owns a bucket with that name —
you'll need to choose a different name (S3 bucket names are global).
"""
s3 = get_client("s3", region=region)
try:
# us-east-1 does NOT accept CreateBucketConfiguration
kwargs = {"Bucket": bucket_name}
if region != "us-east-1":
kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
s3.create_bucket(**kwargs)
logger.info(f"Created bucket: {bucket_name}")
return True
except ClientError as e:
code = e.response["Error"]["Code"]
if code in ("BucketAlreadyOwnedByYou",):
logger.info(f"Bucket already exists (owned by you): {bucket_name}")
return True # Idempotent success
if code == "BucketAlreadyExists":
raise ValueError(
f"Bucket name taken by another account: {bucket_name}"
) from e
raise
# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 5: Environment-Driven Configuration
# ══════════════════════════════════════════════════════════════════
@dataclass
class AppConfig:
"""
All configuration comes from environment variables.
This allows the same code to run in dev/staging/prod by just
changing environment variables — no code changes needed.
@dataclass generates __init__, __repr__, __eq__ automatically.
field(default_factory=...) handles mutable defaults safely.
"""
# Required (raise if missing)
sns_topic_arn: str = field(default_factory=lambda: os.environ["SNS_TOPIC_ARN"])
s3_bucket: str = field(default_factory=lambda: os.environ["S3_BUCKET"])
# Optional with defaults
environment: str = field(default_factory=lambda: os.environ.get("ENVIRONMENT", "dev"))
region: str = field(default_factory=lambda: os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))
dry_run: bool = field(default_factory=lambda: os.environ.get("DRY_RUN", "false").lower() == "true")
log_level: str = field(default_factory=lambda: os.environ.get("LOG_LEVEL", "INFO"))
role_arn: Optional[str] = field(default_factory=lambda: os.environ.get("ASSUME_ROLE_ARN"))
def validate(self) -> None:
"""Validate config on startup rather than failing mid-execution."""
errors = []
if not self.sns_topic_arn.startswith("arn:aws:sns:"):
errors.append(f"Invalid SNS_TOPIC_ARN: {self.sns_topic_arn}")
if not self.s3_bucket:
errors.append("S3_BUCKET is empty")
if self.environment not in ("dev", "staging", "prod"):
errors.append(f"Unknown ENVIRONMENT: {self.environment}")
if errors:
raise EnvironmentError(
f"Configuration errors:\n" + "\n".join(f" - {e}" for e in errors)
)
# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 6: Dry-Run Mode
# ══════════════════════════════════════════════════════════════════
def delete_old_objects(
bucket: str,
prefix: str,
older_than_days: int,
dry_run: bool = True,
) -> list[str]:
"""
dry_run=True (default) → reports what would be deleted without deleting.
dry_run=False → performs actual deletion.
Always default to dry_run=True. The caller must explicitly opt in
to destructive operations.
"""
from datetime import datetime, timezone, timedelta
s3 = get_client("s3")
cutoff = datetime.now(timezone.utc) - timedelta(days=older_than_days)
would_delete = []
paginator = s3.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
if obj["LastModified"] < cutoff:
would_delete.append(obj["Key"])
if dry_run:
logger.info(f"[DRY-RUN] Would delete: s3://{bucket}/{obj['Key']}")
else:
s3.delete_object(Bucket=bucket, Key=obj["Key"])
logger.info(f"Deleted: s3://{bucket}/{obj['Key']}")
action = "Would delete" if dry_run else "Deleted"
logger.info(f"{action} {len(would_delete)} object(s) from s3://{bucket}/{prefix}")
return would_delete
# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 7: Graceful main() with Exit Codes
# ══════════════════════════════════════════════════════════════════
def main() -> int:
"""
main() returns an integer exit code — 0 for success, non-zero for failure.
sys.exit() translates this to the process exit code.
CI/CD pipelines (GitHub Actions, Jenkins) read the exit code:
- 0 = step succeeded → continue pipeline
- Non-zero = step failed → stop pipeline, send alert
We catch specific exceptions at the top level and map them to
meaningful exit codes rather than a generic "something failed".
"""
try:
config = AppConfig()
config.validate()
logger.info(f"Starting in {config.environment} | dry_run={config.dry_run}")
# Your script logic here
create_s3_bucket_idempotent(config.s3_bucket, config.region)
deleted = delete_old_objects(
bucket=config.s3_bucket,
prefix="temp/",
older_than_days=30,
dry_run=config.dry_run,
)
logger.info("Script completed successfully", extra={"objects_processed": len(deleted)})
return 0 # ← success
except KeyError as e:
logger.critical(f"Missing required environment variable: {e}")
return 1 # ← config/environment error
except EnvironmentError as e:
logger.critical(f"Configuration validation failed: {e}")
return 1
except NoCredentialsError:
logger.critical("AWS credentials not found. Run 'aws configure' or attach an IAM role.")
return 2 # ← credential error
except EndpointConnectionError as e:
logger.critical(f"Cannot reach AWS API endpoint: {e}")
return 3 # ← network error
except PermissionError as e:
logger.critical(f"IAM permission denied: {e}")
return 4 # ← authorization error
except ClientError as e:
logger.critical(
f"AWS API error: {e.response['Error']['Code']} — {e.response['Error']['Message']}"
)
return 5 # ← AWS API error
except Exception as e:
logger.critical(f"Unexpected error: {e}", exc_info=True)
return 99 # ← unknown error
# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 8: Input Validation at System Boundaries
# ══════════════════════════════════════════════════════════════════
def validate_s3_key(key: str) -> str:
"""
Validate and sanitize user-provided S3 keys before using them in API calls.
S3 keys can be up to 1,024 bytes. They may not start with '..'.
"""
if not key:
raise ValueError("S3 key cannot be empty")
if len(key.encode("utf-8")) > 1024:
raise ValueError(f"S3 key exceeds 1,024 bytes: {key[:50]}...")
if key.startswith(".."):
raise ValueError(f"S3 key cannot start with '..': {key}")
# Normalize: remove leading slashes
return key.lstrip("/")
if __name__ == "__main__":
sys.exit(main())
Key Commands & Patterns Explained
| Pattern | What it does |
|---|---|
logging.StreamHandler(sys.stdout) | Write logs to stdout (captured by Lambda, ECS, Kubernetes) |
JsonFormatter | Converts log records to JSON strings for structured logging |
Config(retries={"mode": "adaptive"}) | boto3 adaptive retry with token bucket algorithm |
Config(connect_timeout=5, read_timeout=30) | Prevents scripts from hanging on network issues |
sts.assume_role(RoleArn, RoleSessionName) | Gets temporary creds for cross-account or cross-service access |
sm.get_secret_value(SecretId=name) | Retrieves secret — never hardcode credentials |
catch BucketAlreadyOwnedByYou → return True | Idempotent create — safe to run multiple times |
os.environ["KEY"] | Raises KeyError if missing — fails fast at startup |
os.environ.get("KEY", "default") | Optional config with sensible default |
dry_run=True as default parameter | Forces caller to opt-in to destructive operations |
sys.exit(main()) | Translates return value to OS exit code |
Environment Variables Reference
# Required
export SNS_TOPIC_ARN="arn:aws:sns:us-east-1:123456789012:alerts"
export S3_BUCKET="my-app-prod-data"
# Optional with defaults
export ENVIRONMENT="prod"
export AWS_DEFAULT_REGION="ap-south-1"
export DRY_RUN="false"
export LOG_LEVEL="INFO"
export ASSUME_ROLE_ARN="arn:aws:iam::999999999999:role/CrossAccountRole"
🔍 Line-by-Line Code Walkthrough
Best Practice 1 — Structured JSON Logging
class JsonFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
return json.dumps({
"time": self.formatTime(record, "%Y-%m-%dT%H:%M:%SZ"),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
**({"exception": self.formatException(record.exc_info)}
if record.exc_info else {}),
})
| Line | Explanation |
|---|---|
class JsonFormatter(logging.Formatter) | Subclasses the built-in Python formatter. We override only format() — everything else (handler attachment, level filtering) stays the same |
def format(self, record: logging.LogRecord) | Called by the logging system for every log statement. record contains all metadata: message, level, name, timestamp, exception info |
self.formatTime(record, "%Y-%m-%dT%H:%M:%SZ") | Converts the log timestamp to ISO 8601 format (e.g., "2025-01-20T14:30:00Z"). CloudWatch Logs Insights recognizes this format and shows a timeline |
record.levelname | The string level: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" |
record.getMessage() | Returns the formatted log message. Calling .getMessage() (not .message) ensures %s formatting and f-string substitution is applied |
**({"exception": ...} if record.exc_info else {}) | Dictionary unpacking with a conditional. If exc_info is set (i.e., logged with logger.error("msg", exc_info=True)), adds the "exception" key. If not, adds nothing |
handler = logging.StreamHandler(sys.stdout) | Writes to stdout (not stderr). Lambda, ECS, and Kubernetes capture stdout for log aggregation |
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper()) | Log level from environment variable. "DEBUG" in dev, "INFO" in prod. .upper() handles "info" or "Info" inputs |
Best Practice 2 — botocore Config
BOTO_CONFIG = Config(
retries={"mode": "adaptive", "max_attempts": 10},
connect_timeout=5,
read_timeout=30,
max_pool_connections=10,
)
| Field | Explanation |
|---|---|
retries={"mode": "adaptive"} | Token bucket algorithm: when throttled, waits for tokens to regenerate before retrying. Smarter than fixed backoff because it adapts to the actual throttle rate |
"max_attempts": 10 | Total call attempts including the first. So 10 = 1 initial + 9 retries |
connect_timeout=5 | Raise EndpointConnectionError if the TCP handshake to the AWS API doesn’t complete in 5 seconds. Catches DNS failures and network partitions |
read_timeout=30 | Raise ReadTimeoutError if the API response doesn’t arrive within 30 seconds. Prevents infinite hangs |
max_pool_connections=10 | Maximum concurrent HTTP connections in the connection pool. If your script makes 20 parallel calls, increase this to avoid connection waiting |
| Where to apply it | Pass config=BOTO_CONFIG to every boto3.client() call: boto3.client("ec2", config=BOTO_CONFIG) |
Best Practice 3 — Credential Management
region = region or os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
if role_arn:
sts = boto3.client("sts", config=BOTO_CONFIG)
assumed = sts.assume_role(
RoleArn=role_arn,
RoleSessionName=f"automation-{service}",
)["Credentials"]
return boto3.client(
service,
aws_access_key_id=assumed["AccessKeyId"],
aws_secret_access_key=assumed["SecretAccessKey"],
aws_session_token=assumed["SessionToken"],
config=BOTO_CONFIG,
)
| Line | Explanation |
|---|---|
region or os.environ.get(...) | If region argument is None (not provided), fall back to the env var, then to "us-east-1". The or short-circuits: truthy region skips the env var lookup |
sts.assume_role(RoleArn=..., RoleSessionName=...) | Exchanges the current identity for temporary credentials in another role/account. Returns {"Credentials": {"AccessKeyId": ..., "SecretAccessKey": ..., "SessionToken": ...}} |
["Credentials"] | Immediately indexes into the response to get the credential dict |
aws_session_token=assumed["SessionToken"] | Assumed role credentials ALWAYS require a session token — omitting it causes InvalidClientTokenId errors |
No role_arn path | return boto3.client(service, ...) — boto3 uses the automatic credential chain: env vars → ~/.aws/credentials → EC2/Lambda instance metadata |
get_secret(secret_name) — Secrets Manager Retrieval
response = sm.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
| Line | Explanation |
|---|---|
get_secret_value(SecretId=name) | SecretId can be the secret’s full ARN or its name. Returns a dict with "SecretString" (for text secrets) or "SecretBinary" (for binary) |
response["SecretString"] | The raw string value of the secret — typically a JSON string like '{"host":"db.example.com","password":"abc123"}' |
json.loads(response["SecretString"]) | Parses the JSON string into a Python dict so callers can do secret["host"], secret["password"] |
if error_code == "ResourceNotFoundException" | Raises a descriptive ValueError instead of a raw boto3 error — easier to understand in logs |
raise ... from e | Chains the original exception. The original ClientError is preserved in __cause__ for debugging |
Best Practice 4 — create_s3_bucket_idempotent
kwargs = {"Bucket": bucket_name}
if region != "us-east-1":
kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
s3.create_bucket(**kwargs)
| Line | Explanation |
|---|---|
if region != "us-east-1" | S3 quirk: us-east-1 is the global default region and does not accept CreateBucketConfiguration. Every other region requires it. Passing it for us-east-1 raises InvalidLocationConstraint |
{"LocationConstraint": region} | Tells S3 to create the bucket in the specified region. Without it, the bucket would be created in us-east-1 regardless |
if code in ("BucketAlreadyOwnedByYou",): | Using a tuple for the in test is idiomatic for future extensibility. BucketAlreadyOwnedByYou means your account already has this bucket — treat as success |
return True both times | Whether we created it or it already existed, the desired state is achieved. That’s idempotency |
Best Practice 5 — AppConfig Dataclass
@dataclass
class AppConfig:
sns_topic_arn: str = field(default_factory=lambda: os.environ["SNS_TOPIC_ARN"])
dry_run: bool = field(default_factory=lambda: os.environ.get("DRY_RUN", "false").lower() == "true")
| Line | Explanation |
|---|---|
@dataclass | Auto-generates __init__, __repr__, __eq__. The __init__ calls each default_factory lazily when the object is created |
field(default_factory=lambda: ...) | default_factory is a callable. The lambda is called at instantiation time — not at class definition time. This means the env var is read when AppConfig() is called, not at import |
os.environ["SNS_TOPIC_ARN"] | Raises KeyError if the variable is missing — which propagates to main() and gets caught as except KeyError → return 1. This is “fail fast” — don’t let the script run with incomplete config |
os.environ.get("DRY_RUN", "false").lower() == "true" | Reads the string "true", "True", or "TRUE" and converts to Python bool. .lower() makes it case-insensitive |
config.validate() | Called after construction. Validates all fields together, so you get one error message listing all problems instead of crashing on the first |
Best Practice 6 — Dry-Run Mode
def delete_old_objects(bucket, prefix, older_than_days, dry_run: bool = True) -> list[str]:
if dry_run:
logger.info(f"[DRY-RUN] Would delete: s3://{bucket}/{obj['Key']}")
else:
s3.delete_object(Bucket=bucket, Key=obj["Key"])
| Line | Explanation |
|---|---|
dry_run: bool = True | Default is True — you must explicitly pass dry_run=False to delete. This prevents accidental deletions from missing a flag |
[DRY-RUN] prefix in log | Makes dry-run output visually distinct in logs. Operators can grep for [DRY-RUN] to review what WOULD happen |
| Function returns the same list in both modes | Callers can inspect what was (or would be) deleted without branching on dry_run themselves |
Best Practice 7 — main() with Exit Codes
def main() -> int:
try:
...
return 0
except KeyError as e:
logger.critical(f"Missing required environment variable: {e}")
return 1
except NoCredentialsError:
return 2
except EndpointConnectionError:
return 3
except PermissionError:
return 4
except ClientError:
return 5
except Exception:
return 99
if __name__ == "__main__":
sys.exit(main())
| Line | Explanation |
|---|---|
def main() -> int | Returns an integer. This separates the script logic from the OS interface — makes main() testable (just check the return value) |
return 0 | Unix convention for success. CI/CD systems (GitHub Actions, Jenkins) only continue the pipeline if the step exits 0 |
return 1 (KeyError/EnvironmentError) | Config/environment problem — operator needs to set env vars |
return 2 (NoCredentialsError) | Credentials missing — aws configure or attach IAM role |
return 3 (EndpointConnectionError) | Network issue — can’t reach AWS API |
return 4 (PermissionError) | IAM policy missing a permission |
return 5 (ClientError) | AWS API rejected the call |
return 99 | Unexpected/unknown error — always log with exc_info=True for full traceback |
sys.exit(main()) | sys.exit(0) sets process exit code 0. sys.exit(5) sets exit code 5. Scripts (and CI) read this |
if __name__ == "__main__": | Prevents main() from running when this module is imported. Only runs when executed directly (python production_aws_script.py) |
- Structured JSON logging
- botocore Config for timeouts and retries
- Secrets Manager secret retrieval
- Idempotent resource creation
- Environment-based configuration
- Meaningful exit codes
Have a similar scenario to share?
Production incidents are the best teachers. Submit your real-world scenario and help others learn.
Open Google FormRelated Scenarios
Generic boto3 Pagination Utility — Handle All Paginated AWS APIs
Problem Statement You write ec2.describe_instances() and it works in dev with 5 instances. In production with 1,200 instances, it silently …
boto3 Retry Decorator with Exponential Backoff for ThrottlingException
Problem Statement Your compliance script calls describe_instances in a loop across 50 regions and 20 accounts. After 30 seconds, you start …
Auto Stop/Start EC2 Instances Using Schedule Tags with Python
Problem Statement Your team has 20 dev/staging EC2 instances that run 24/7 but are only used during business hours (8 AM – 8 PM). Each …