Scenario Advanced Python Python AWS Scripting

Production-Grade Python Scripts for AWS — Best Practices & Patterns

Code examples covering structured logging, credential management, idempotent operations, environment configuration, Secrets Manager integration, and graceful error handling.

January 20, 2025 13 min read ~25 min to complete DB
The Situation

Senior engineer code quality — scripts that pass a production code review: structured logging, no hardcoded secrets, dry-run mode, idempotent operations, and meaningful exit codes.

8 Steps
4 Services Used
~25 min Duration
Advanced Difficulty

The 8 Production Best Practices

#PracticeWhy it matters
1Structured JSON loggingCloudWatch Logs Insights can query JSON fields
2botocore Config with timeoutsHangs are worse than errors
3Credentials via IAM role or Secrets ManagerHardcoded keys are a breach waiting to happen
4Idempotent operationsScript should be safe to run twice
5Environment-driven configurationSame code runs in dev/staging/prod
6Dry-run modeDestructive scripts must be testable safely
7Graceful main() with typed exit codesCI/CD pipelines read exit codes
8Input validation at boundariesCatch bad input before making AWS API calls

Complete Reference Script

"""
production_aws_script.py

Template for production-grade AWS automation scripts.
Demonstrates all 8 best practices with annotated examples.
"""

import boto3
import logging
import os
import sys
import json
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError, NoCredentialsError
from typing import Optional
from dataclasses import dataclass, field


# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 1: Structured JSON Logging
# ══════════════════════════════════════════════════════════════════
def setup_logger(name: str) -> logging.Logger:
    """
    Structured JSON logging is essential for CloudWatch Logs Insights
    and log aggregation tools (Datadog, Splunk, ELK).

    CloudWatch Logs Insights query example:
      fields @timestamp, level, message
      | filter level = "ERROR"
      | sort @timestamp desc

    We write to stdout (not stderr) so logs appear in Lambda/ECS
    container logs without mixing with error streams.
    """
    logger = logging.getLogger(name)
    handler = logging.StreamHandler(sys.stdout)

    class JsonFormatter(logging.Formatter):
        def format(self, record: logging.LogRecord) -> str:
            return json.dumps({
                "time":    self.formatTime(record, "%Y-%m-%dT%H:%M:%SZ"),
                "level":   record.levelname,
                "logger":  record.name,
                "message": record.getMessage(),
                # Include exception info if present
                **({"exception": self.formatException(record.exc_info)}
                   if record.exc_info else {}),
            })

    handler.setFormatter(JsonFormatter())
    logger.addHandler(handler)
    # Read log level from env — "INFO" in prod, "DEBUG" in dev
    logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper())
    return logger

logger = setup_logger("aws-automation")


# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 2: botocore Config with Timeouts and Retries
# ══════════════════════════════════════════════════════════════════
BOTO_CONFIG = Config(
    # Adaptive retry uses a token bucket algorithm:
    # - Token bucket starts full; each retry consumes tokens
    # - Tokens regenerate over time
    # - Better than "standard" mode for burst throttling scenarios
    retries={
        "mode":         "adaptive",
        "max_attempts": 10,
    },
    # connect_timeout: seconds to wait for TCP connection to AWS API
    # read_timeout: seconds to wait for the API response body
    # Lower values = faster failure detection for network issues
    connect_timeout=5,
    read_timeout=30,
    # max_pool_connections: max persistent HTTP connections in the pool
    # Increase for high-concurrency scripts that make many parallel calls
    max_pool_connections=10,
)


# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 3: Credential Management
# ══════════════════════════════════════════════════════════════════
def get_client(
    service: str,
    region: Optional[str] = None,
    role_arn: Optional[str] = None,
) -> boto3.client:
    """
    NEVER hardcode credentials. This function provides a safe pattern
    for getting a boto3 client with proper credential handling.

    Credential resolution order (automatic):
    1. role_arn argument → assume cross-account role
    2. AWS_ACCESS_KEY_ID / AWS_SECRET_ACCESS_KEY env vars
    3. ~/.aws/credentials file
    4. IAM role attached to the EC2/ECS/Lambda (best for production)

    If role_arn is provided, we use STS to assume that role first.
    This is the standard pattern for cross-account access.
    """
    region = region or os.environ.get("AWS_DEFAULT_REGION", "us-east-1")

    if role_arn:
        sts = boto3.client("sts", config=BOTO_CONFIG)
        assumed = sts.assume_role(
            RoleArn=role_arn,
            RoleSessionName=f"automation-{service}",
        )["Credentials"]
        return boto3.client(
            service,
            region_name=region,
            aws_access_key_id=assumed["AccessKeyId"],
            aws_secret_access_key=assumed["SecretAccessKey"],
            aws_session_token=assumed["SessionToken"],
            config=BOTO_CONFIG,
        )

    return boto3.client(service, region_name=region, config=BOTO_CONFIG)


def get_secret(secret_name: str, region: str = "us-east-1") -> dict:
    """
    Retrieve a JSON-encoded secret from AWS Secrets Manager.

    This replaces hardcoded passwords, API keys, and connection strings.
    Secrets Manager also handles rotation automatically.

    Usage in code:
        db_config = get_secret("prod/postgres/app-user")
        conn = psycopg2.connect(
            host=db_config["host"],
            password=db_config["password"],
        )
    """
    sm = get_client("secretsmanager", region=region)
    try:
        response = sm.get_secret_value(SecretId=secret_name)
        # SecretString is the JSON string of your secret
        return json.loads(response["SecretString"])
    except ClientError as e:
        error_code = e.response["Error"]["Code"]
        if error_code == "ResourceNotFoundException":
            raise ValueError(f"Secret not found: {secret_name}") from e
        if error_code == "AccessDeniedException":
            raise PermissionError(
                f"IAM role lacks secretsmanager:GetSecretValue on {secret_name}"
            ) from e
        raise


# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 4: Idempotent Operations
# ══════════════════════════════════════════════════════════════════
def create_s3_bucket_idempotent(bucket_name: str, region: str = "us-east-1") -> bool:
    """
    Idempotent: calling this function twice has the same effect as once.
    The script is safe to re-run after a failure without creating duplicates.

    S3 create_bucket raises BucketAlreadyOwnedByYou if the bucket exists
    AND is owned by your account — we treat this as success.

    BucketAlreadyExists means another account owns a bucket with that name —
    you'll need to choose a different name (S3 bucket names are global).
    """
    s3 = get_client("s3", region=region)
    try:
        # us-east-1 does NOT accept CreateBucketConfiguration
        kwargs = {"Bucket": bucket_name}
        if region != "us-east-1":
            kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}

        s3.create_bucket(**kwargs)
        logger.info(f"Created bucket: {bucket_name}")
        return True

    except ClientError as e:
        code = e.response["Error"]["Code"]
        if code in ("BucketAlreadyOwnedByYou",):
            logger.info(f"Bucket already exists (owned by you): {bucket_name}")
            return True   # Idempotent success
        if code == "BucketAlreadyExists":
            raise ValueError(
                f"Bucket name taken by another account: {bucket_name}"
            ) from e
        raise


# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 5: Environment-Driven Configuration
# ══════════════════════════════════════════════════════════════════
@dataclass
class AppConfig:
    """
    All configuration comes from environment variables.
    This allows the same code to run in dev/staging/prod by just
    changing environment variables — no code changes needed.

    @dataclass generates __init__, __repr__, __eq__ automatically.
    field(default_factory=...) handles mutable defaults safely.
    """
    # Required (raise if missing)
    sns_topic_arn:  str = field(default_factory=lambda: os.environ["SNS_TOPIC_ARN"])
    s3_bucket:      str = field(default_factory=lambda: os.environ["S3_BUCKET"])

    # Optional with defaults
    environment:   str   = field(default_factory=lambda: os.environ.get("ENVIRONMENT", "dev"))
    region:        str   = field(default_factory=lambda: os.environ.get("AWS_DEFAULT_REGION", "us-east-1"))
    dry_run:       bool  = field(default_factory=lambda: os.environ.get("DRY_RUN", "false").lower() == "true")
    log_level:     str   = field(default_factory=lambda: os.environ.get("LOG_LEVEL", "INFO"))
    role_arn:      Optional[str] = field(default_factory=lambda: os.environ.get("ASSUME_ROLE_ARN"))

    def validate(self) -> None:
        """Validate config on startup rather than failing mid-execution."""
        errors = []
        if not self.sns_topic_arn.startswith("arn:aws:sns:"):
            errors.append(f"Invalid SNS_TOPIC_ARN: {self.sns_topic_arn}")
        if not self.s3_bucket:
            errors.append("S3_BUCKET is empty")
        if self.environment not in ("dev", "staging", "prod"):
            errors.append(f"Unknown ENVIRONMENT: {self.environment}")
        if errors:
            raise EnvironmentError(
                f"Configuration errors:\n" + "\n".join(f"  - {e}" for e in errors)
            )


# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 6: Dry-Run Mode
# ══════════════════════════════════════════════════════════════════
def delete_old_objects(
    bucket: str,
    prefix: str,
    older_than_days: int,
    dry_run: bool = True,
) -> list[str]:
    """
    dry_run=True (default) → reports what would be deleted without deleting.
    dry_run=False → performs actual deletion.

    Always default to dry_run=True. The caller must explicitly opt in
    to destructive operations.
    """
    from datetime import datetime, timezone, timedelta
    s3 = get_client("s3")
    cutoff = datetime.now(timezone.utc) - timedelta(days=older_than_days)
    would_delete = []

    paginator = s3.get_paginator("list_objects_v2")
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        for obj in page.get("Contents", []):
            if obj["LastModified"] < cutoff:
                would_delete.append(obj["Key"])
                if dry_run:
                    logger.info(f"[DRY-RUN] Would delete: s3://{bucket}/{obj['Key']}")
                else:
                    s3.delete_object(Bucket=bucket, Key=obj["Key"])
                    logger.info(f"Deleted: s3://{bucket}/{obj['Key']}")

    action = "Would delete" if dry_run else "Deleted"
    logger.info(f"{action} {len(would_delete)} object(s) from s3://{bucket}/{prefix}")
    return would_delete


# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 7: Graceful main() with Exit Codes
# ══════════════════════════════════════════════════════════════════
def main() -> int:
    """
    main() returns an integer exit code — 0 for success, non-zero for failure.
    sys.exit() translates this to the process exit code.
    CI/CD pipelines (GitHub Actions, Jenkins) read the exit code:
      - 0 = step succeeded → continue pipeline
      - Non-zero = step failed → stop pipeline, send alert

    We catch specific exceptions at the top level and map them to
    meaningful exit codes rather than a generic "something failed".
    """
    try:
        config = AppConfig()
        config.validate()

        logger.info(f"Starting in {config.environment} | dry_run={config.dry_run}")

        # Your script logic here
        create_s3_bucket_idempotent(config.s3_bucket, config.region)

        deleted = delete_old_objects(
            bucket=config.s3_bucket,
            prefix="temp/",
            older_than_days=30,
            dry_run=config.dry_run,
        )

        logger.info("Script completed successfully", extra={"objects_processed": len(deleted)})
        return 0   # ← success

    except KeyError as e:
        logger.critical(f"Missing required environment variable: {e}")
        return 1   # ← config/environment error

    except EnvironmentError as e:
        logger.critical(f"Configuration validation failed: {e}")
        return 1

    except NoCredentialsError:
        logger.critical("AWS credentials not found. Run 'aws configure' or attach an IAM role.")
        return 2   # ← credential error

    except EndpointConnectionError as e:
        logger.critical(f"Cannot reach AWS API endpoint: {e}")
        return 3   # ← network error

    except PermissionError as e:
        logger.critical(f"IAM permission denied: {e}")
        return 4   # ← authorization error

    except ClientError as e:
        logger.critical(
            f"AWS API error: {e.response['Error']['Code']}{e.response['Error']['Message']}"
        )
        return 5   # ← AWS API error

    except Exception as e:
        logger.critical(f"Unexpected error: {e}", exc_info=True)
        return 99  # ← unknown error


# ══════════════════════════════════════════════════════════════════
# BEST PRACTICE 8: Input Validation at System Boundaries
# ══════════════════════════════════════════════════════════════════
def validate_s3_key(key: str) -> str:
    """
    Validate and sanitize user-provided S3 keys before using them in API calls.
    S3 keys can be up to 1,024 bytes. They may not start with '..'.
    """
    if not key:
        raise ValueError("S3 key cannot be empty")
    if len(key.encode("utf-8")) > 1024:
        raise ValueError(f"S3 key exceeds 1,024 bytes: {key[:50]}...")
    if key.startswith(".."):
        raise ValueError(f"S3 key cannot start with '..': {key}")
    # Normalize: remove leading slashes
    return key.lstrip("/")


if __name__ == "__main__":
    sys.exit(main())

Key Commands & Patterns Explained

PatternWhat it does
logging.StreamHandler(sys.stdout)Write logs to stdout (captured by Lambda, ECS, Kubernetes)
JsonFormatterConverts log records to JSON strings for structured logging
Config(retries={"mode": "adaptive"})boto3 adaptive retry with token bucket algorithm
Config(connect_timeout=5, read_timeout=30)Prevents scripts from hanging on network issues
sts.assume_role(RoleArn, RoleSessionName)Gets temporary creds for cross-account or cross-service access
sm.get_secret_value(SecretId=name)Retrieves secret — never hardcode credentials
catch BucketAlreadyOwnedByYou → return TrueIdempotent create — safe to run multiple times
os.environ["KEY"]Raises KeyError if missing — fails fast at startup
os.environ.get("KEY", "default")Optional config with sensible default
dry_run=True as default parameterForces caller to opt-in to destructive operations
sys.exit(main())Translates return value to OS exit code

Environment Variables Reference

# Required
export SNS_TOPIC_ARN="arn:aws:sns:us-east-1:123456789012:alerts"
export S3_BUCKET="my-app-prod-data"

# Optional with defaults
export ENVIRONMENT="prod"
export AWS_DEFAULT_REGION="ap-south-1"
export DRY_RUN="false"
export LOG_LEVEL="INFO"
export ASSUME_ROLE_ARN="arn:aws:iam::999999999999:role/CrossAccountRole"

🔍 Line-by-Line Code Walkthrough

Best Practice 1 — Structured JSON Logging

class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        return json.dumps({
            "time":    self.formatTime(record, "%Y-%m-%dT%H:%M:%SZ"),
            "level":   record.levelname,
            "logger":  record.name,
            "message": record.getMessage(),
            **({"exception": self.formatException(record.exc_info)}
               if record.exc_info else {}),
        })
LineExplanation
class JsonFormatter(logging.Formatter)Subclasses the built-in Python formatter. We override only format() — everything else (handler attachment, level filtering) stays the same
def format(self, record: logging.LogRecord)Called by the logging system for every log statement. record contains all metadata: message, level, name, timestamp, exception info
self.formatTime(record, "%Y-%m-%dT%H:%M:%SZ")Converts the log timestamp to ISO 8601 format (e.g., "2025-01-20T14:30:00Z"). CloudWatch Logs Insights recognizes this format and shows a timeline
record.levelnameThe string level: "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"
record.getMessage()Returns the formatted log message. Calling .getMessage() (not .message) ensures %s formatting and f-string substitution is applied
**({"exception": ...} if record.exc_info else {})Dictionary unpacking with a conditional. If exc_info is set (i.e., logged with logger.error("msg", exc_info=True)), adds the "exception" key. If not, adds nothing
handler = logging.StreamHandler(sys.stdout)Writes to stdout (not stderr). Lambda, ECS, and Kubernetes capture stdout for log aggregation
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO").upper())Log level from environment variable. "DEBUG" in dev, "INFO" in prod. .upper() handles "info" or "Info" inputs

Best Practice 2 — botocore Config

BOTO_CONFIG = Config(
    retries={"mode": "adaptive", "max_attempts": 10},
    connect_timeout=5,
    read_timeout=30,
    max_pool_connections=10,
)
FieldExplanation
retries={"mode": "adaptive"}Token bucket algorithm: when throttled, waits for tokens to regenerate before retrying. Smarter than fixed backoff because it adapts to the actual throttle rate
"max_attempts": 10Total call attempts including the first. So 10 = 1 initial + 9 retries
connect_timeout=5Raise EndpointConnectionError if the TCP handshake to the AWS API doesn’t complete in 5 seconds. Catches DNS failures and network partitions
read_timeout=30Raise ReadTimeoutError if the API response doesn’t arrive within 30 seconds. Prevents infinite hangs
max_pool_connections=10Maximum concurrent HTTP connections in the connection pool. If your script makes 20 parallel calls, increase this to avoid connection waiting
Where to apply itPass config=BOTO_CONFIG to every boto3.client() call: boto3.client("ec2", config=BOTO_CONFIG)

Best Practice 3 — Credential Management

region = region or os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
if role_arn:
    sts = boto3.client("sts", config=BOTO_CONFIG)
    assumed = sts.assume_role(
        RoleArn=role_arn,
        RoleSessionName=f"automation-{service}",
    )["Credentials"]
    return boto3.client(
        service,
        aws_access_key_id=assumed["AccessKeyId"],
        aws_secret_access_key=assumed["SecretAccessKey"],
        aws_session_token=assumed["SessionToken"],
        config=BOTO_CONFIG,
    )
LineExplanation
region or os.environ.get(...)If region argument is None (not provided), fall back to the env var, then to "us-east-1". The or short-circuits: truthy region skips the env var lookup
sts.assume_role(RoleArn=..., RoleSessionName=...)Exchanges the current identity for temporary credentials in another role/account. Returns {"Credentials": {"AccessKeyId": ..., "SecretAccessKey": ..., "SessionToken": ...}}
["Credentials"]Immediately indexes into the response to get the credential dict
aws_session_token=assumed["SessionToken"]Assumed role credentials ALWAYS require a session token — omitting it causes InvalidClientTokenId errors
No role_arn pathreturn boto3.client(service, ...) — boto3 uses the automatic credential chain: env vars → ~/.aws/credentials → EC2/Lambda instance metadata

get_secret(secret_name) — Secrets Manager Retrieval

response = sm.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
LineExplanation
get_secret_value(SecretId=name)SecretId can be the secret’s full ARN or its name. Returns a dict with "SecretString" (for text secrets) or "SecretBinary" (for binary)
response["SecretString"]The raw string value of the secret — typically a JSON string like '{"host":"db.example.com","password":"abc123"}'
json.loads(response["SecretString"])Parses the JSON string into a Python dict so callers can do secret["host"], secret["password"]
if error_code == "ResourceNotFoundException"Raises a descriptive ValueError instead of a raw boto3 error — easier to understand in logs
raise ... from eChains the original exception. The original ClientError is preserved in __cause__ for debugging

Best Practice 4 — create_s3_bucket_idempotent

kwargs = {"Bucket": bucket_name}
if region != "us-east-1":
    kwargs["CreateBucketConfiguration"] = {"LocationConstraint": region}
s3.create_bucket(**kwargs)
LineExplanation
if region != "us-east-1"S3 quirk: us-east-1 is the global default region and does not accept CreateBucketConfiguration. Every other region requires it. Passing it for us-east-1 raises InvalidLocationConstraint
{"LocationConstraint": region}Tells S3 to create the bucket in the specified region. Without it, the bucket would be created in us-east-1 regardless
if code in ("BucketAlreadyOwnedByYou",):Using a tuple for the in test is idiomatic for future extensibility. BucketAlreadyOwnedByYou means your account already has this bucket — treat as success
return True both timesWhether we created it or it already existed, the desired state is achieved. That’s idempotency

Best Practice 5 — AppConfig Dataclass

@dataclass
class AppConfig:
    sns_topic_arn: str = field(default_factory=lambda: os.environ["SNS_TOPIC_ARN"])
    dry_run: bool      = field(default_factory=lambda: os.environ.get("DRY_RUN", "false").lower() == "true")
LineExplanation
@dataclassAuto-generates __init__, __repr__, __eq__. The __init__ calls each default_factory lazily when the object is created
field(default_factory=lambda: ...)default_factory is a callable. The lambda is called at instantiation time — not at class definition time. This means the env var is read when AppConfig() is called, not at import
os.environ["SNS_TOPIC_ARN"]Raises KeyError if the variable is missing — which propagates to main() and gets caught as except KeyError → return 1. This is “fail fast” — don’t let the script run with incomplete config
os.environ.get("DRY_RUN", "false").lower() == "true"Reads the string "true", "True", or "TRUE" and converts to Python bool. .lower() makes it case-insensitive
config.validate()Called after construction. Validates all fields together, so you get one error message listing all problems instead of crashing on the first

Best Practice 6 — Dry-Run Mode

def delete_old_objects(bucket, prefix, older_than_days, dry_run: bool = True) -> list[str]:
    if dry_run:
        logger.info(f"[DRY-RUN] Would delete: s3://{bucket}/{obj['Key']}")
    else:
        s3.delete_object(Bucket=bucket, Key=obj["Key"])
LineExplanation
dry_run: bool = TrueDefault is True — you must explicitly pass dry_run=False to delete. This prevents accidental deletions from missing a flag
[DRY-RUN] prefix in logMakes dry-run output visually distinct in logs. Operators can grep for [DRY-RUN] to review what WOULD happen
Function returns the same list in both modesCallers can inspect what was (or would be) deleted without branching on dry_run themselves

Best Practice 7 — main() with Exit Codes

def main() -> int:
    try:
        ...
        return 0
    except KeyError as e:
        logger.critical(f"Missing required environment variable: {e}")
        return 1
    except NoCredentialsError:
        return 2
    except EndpointConnectionError:
        return 3
    except PermissionError:
        return 4
    except ClientError:
        return 5
    except Exception:
        return 99

if __name__ == "__main__":
    sys.exit(main())
LineExplanation
def main() -> intReturns an integer. This separates the script logic from the OS interface — makes main() testable (just check the return value)
return 0Unix convention for success. CI/CD systems (GitHub Actions, Jenkins) only continue the pipeline if the step exits 0
return 1 (KeyError/EnvironmentError)Config/environment problem — operator needs to set env vars
return 2 (NoCredentialsError)Credentials missing — aws configure or attach IAM role
return 3 (EndpointConnectionError)Network issue — can’t reach AWS API
return 4 (PermissionError)IAM policy missing a permission
return 5 (ClientError)AWS API rejected the call
return 99Unexpected/unknown error — always log with exc_info=True for full traceback
sys.exit(main())sys.exit(0) sets process exit code 0. sys.exit(5) sets exit code 5. Scripts (and CI) read this
if __name__ == "__main__":Prevents main() from running when this module is imported. Only runs when executed directly (python production_aws_script.py)
Services Used
boto3Secrets ManagerCloudWatch LogsIAM
Prerequisites
  • Python 3.8+
  • boto3
  • botocore
What You Learned
  • Structured JSON logging
  • botocore Config for timeouts and retries
  • Secrets Manager secret retrieval
  • Idempotent resource creation
  • Environment-based configuration
  • Meaningful exit codes

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios