Interview Q&A All Levels Python

Python Interview Questions & Answers (2026) Part 01

50+ Python interview questions and answers covering core Python, OOP, data structures, multithreading, APIs, automation, scripting, debugging, and performance optimization — Basic to Advanced.

May 19, 2025 30 min read 50 Questions DB
Level:

🐍 Python Scripting for AWS — Interview Questions & Answers

Real-world Python + AWS (boto3) scripting scenarios asked at top tech companies for Senior DevOps / Cloud Engineer roles. Every question includes working code with explanations.


Here’s your comprehensive Python + AWS scripting interview README! Note that I don’t have live internet access to scrape LinkedIn, but this is built from deep knowledge of what’s actually tested at senior DevOps interviews. What’s inside — 20 fully working Python scripts across 14 topics:

Every script includes production-grade patterns: error handling, logging, pagination, idempotency, and dry-run mode.


Answer:

import boto3
import logging
from datetime import datetime

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_tagged_instances(ec2_client, tag_key, tag_value, state):
    """Fetch EC2 instances with specific tag and state."""
    response = ec2_client.describe_instances(
        Filters=[
            {'Name': f'tag:{tag_key}', 'Values': [tag_value]},
            {'Name': 'instance-state-name', 'Values': [state]}
        ]
    )
    instance_ids = [
        instance['InstanceId']
        for reservation in response['Reservations']
        for instance in reservation['Instances']
    ]
    return instance_ids


def lambda_handler(event, context):
    """
    Lambda handler triggered by EventBridge schedule:
    - 8 PM: stop instances tagged AutoStop=true
    - 8 AM: start instances tagged AutoStop=true
    """
    ec2 = boto3.client('ec2')
    current_hour = datetime.utcnow().hour
    action = 'stop' if current_hour == 20 else 'start'

    if action == 'stop':
        instance_ids = get_tagged_instances(ec2, 'AutoStop', 'true', 'running')
        if instance_ids:
            ec2.stop_instances(InstanceIds=instance_ids)
            logger.info(f"Stopped instances: {instance_ids}")
        else:
            logger.info("No running instances with AutoStop=true found.")

    elif action == 'start':
        instance_ids = get_tagged_instances(ec2, 'AutoStop', 'true', 'stopped')
        if instance_ids:
            ec2.start_instances(InstanceIds=instance_ids)
            logger.info(f"Started instances: {instance_ids}")
        else:
            logger.info("No stopped instances with AutoStop=true found.")

    return {
        'statusCode': 200,
        'action': action,
        'instances_affected': instance_ids if instance_ids else []
    }

EventBridge Schedules (deploy with boto3 or CLI):

import boto3

events = boto3.client('events')

# Stop rule at 8 PM UTC
events.put_rule(
    Name='StopDevInstances',
    ScheduleExpression='cron(0 20 * * ? *)',
    State='ENABLED'
)

# Start rule at 8 AM UTC
events.put_rule(
    Name='StartDevInstances',
    ScheduleExpression='cron(0 8 * * ? *)',
    State='ENABLED'
)

Answer:

import boto3
from datetime import datetime, timezone, timedelta
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


class EBSSnapshotManager:
    def __init__(self, region='us-east-1', retention_days=30):
        self.ec2 = boto3.client('ec2', region_name=region)
        self.retention_days = retention_days

    def get_running_instance_volumes(self):
        """Get all EBS volume IDs attached to running instances."""
        volumes = []
        paginator = self.ec2.get_paginator('describe_instances')
        
        for page in paginator.paginate(
            Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
        ):
            for reservation in page['Reservations']:
                for instance in reservation['Instances']:
                    instance_id = instance['InstanceId']
                    instance_name = next(
                        (tag['Value'] for tag in instance.get('Tags', [])
                         if tag['Key'] == 'Name'),
                        instance_id
                    )
                    for mapping in instance.get('BlockDeviceMappings', []):
                        volumes.append({
                            'volume_id': mapping['Ebs']['VolumeId'],
                            'instance_id': instance_id,
                            'instance_name': instance_name,
                            'device': mapping['DeviceName']
                        })
        return volumes

    def create_snapshots(self, volumes):
        """Create snapshots for a list of volumes."""
        created = []
        timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d-%H%M')

        for vol in volumes:
            try:
                response = self.ec2.create_snapshot(
                    VolumeId=vol['volume_id'],
                    Description=f"Auto-backup {vol['instance_name']} {vol['device']} {timestamp}",
                    TagSpecifications=[{
                        'ResourceType': 'snapshot',
                        'Tags': [
                            {'Key': 'Name', 'Value': f"auto-snap-{vol['instance_name']}-{timestamp}"},
                            {'Key': 'AutoBackup', 'Value': 'true'},
                            {'Key': 'InstanceId', 'Value': vol['instance_id']},
                            {'Key': 'CreatedAt', 'Value': timestamp}
                        ]
                    }]
                )
                snapshot_id = response['SnapshotId']
                created.append(snapshot_id)
                logger.info(f"Created snapshot {snapshot_id} for volume {vol['volume_id']}")
            except Exception as e:
                logger.error(f"Failed to snapshot volume {vol['volume_id']}: {e}")
        return created

    def delete_old_snapshots(self):
        """Delete snapshots tagged AutoBackup=true older than retention_days."""
        cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days)
        deleted = []

        paginator = self.ec2.get_paginator('describe_snapshots')
        for page in paginator.paginate(
            Filters=[{'Name': 'tag:AutoBackup', 'Values': ['true']}],
            OwnerIds=['self']
        ):
            for snapshot in page['Snapshots']:
                if snapshot['StartTime'] < cutoff:
                    try:
                        self.ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
                        deleted.append(snapshot['SnapshotId'])
                        logger.info(f"Deleted old snapshot: {snapshot['SnapshotId']}")
                    except Exception as e:
                        logger.error(f"Could not delete {snapshot['SnapshotId']}: {e}")
        return deleted

    def run(self):
        logger.info("Starting EBS snapshot backup cycle...")
        volumes = self.get_running_instance_volumes()
        logger.info(f"Found {len(volumes)} volumes across running instances")
        
        created = self.create_snapshots(volumes)
        deleted = self.delete_old_snapshots()

        return {
            'volumes_found': len(volumes),
            'snapshots_created': len(created),
            'snapshots_deleted': len(deleted)
        }


# Lambda handler
def lambda_handler(event, context):
    manager = EBSSnapshotManager(retention_days=30)
    return manager.run()


if __name__ == '__main__':
    manager = EBSSnapshotManager()
    result = manager.run()
    print(result)

Answer:

import boto3
from datetime import datetime, timezone, timedelta


def find_untagged_long_running_instances(region='us-east-1'):
    ec2 = boto3.client('ec2', region_name=region)
    ses = boto3.client('ses', region_name=region)
    
    threshold = datetime.now(timezone.utc) - timedelta(days=7)
    violations = []

    paginator = ec2.get_paginator('describe_instances')
    for page in paginator.paginate(
        Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
    ):
        for reservation in page['Reservations']:
            for instance in reservation['Instances']:
                launch_time = instance['LaunchTime']
                tags = {t['Key']: t['Value'] for t in instance.get('Tags', [])}

                if launch_time < threshold and 'Project' not in tags:
                    violations.append({
                        'InstanceId': instance['InstanceId'],
                        'LaunchTime': launch_time.strftime('%Y-%m-%d %H:%M UTC'),
                        'InstanceType': instance['InstanceType'],
                        'Name': tags.get('Name', 'Unnamed'),
                        'Owner': tags.get('Owner', 'Unknown')
                    })

    if violations:
        send_violation_email(ses, violations)
    return violations


def send_violation_email(ses_client, violations):
    rows = "\n".join([
        f"  - {v['InstanceId']} | {v['Name']} | {v['InstanceType']} | Running since {v['LaunchTime']} | Owner: {v['Owner']}"
        for v in violations
    ])
    body = f"""
AWS Compliance Alert: Untagged Long-Running EC2 Instances

The following EC2 instances have been running for more than 7 days
without a required 'Project' tag:

{rows}

Action Required: Please add the 'Project' tag or terminate unused instances.

This is an automated message from the Cloud Governance bot.
    """
    ses_client.send_email(
        Source='devops-alerts@company.com',
        Destination={'ToAddresses': ['infra-team@company.com']},
        Message={
            'Subject': {'Data': f'⚠️ {len(violations)} Untagged EC2 Instances Detected'},
            'Body': {'Text': {'Data': body}}
        }
    )


if __name__ == '__main__':
    result = find_untagged_long_running_instances()
    print(f"Found {len(result)} violations")

Answer:

import boto3
import os
import hashlib
import json
from pathlib import Path
from datetime import datetime
from botocore.exceptions import ClientError


class S3Syncer:
    def __init__(self, bucket, prefix, kms_key_id, region='us-east-1'):
        self.s3 = boto3.client('s3', region_name=region)
        self.bucket = bucket
        self.prefix = prefix.rstrip('/')
        self.kms_key_id = kms_key_id
        self.manifest = []

    def get_file_md5(self, filepath):
        """Calculate MD5 hash for integrity verification."""
        md5 = hashlib.md5()
        with open(filepath, 'rb') as f:
            for chunk in iter(lambda: f.read(8192), b''):
                md5.update(chunk)
        return md5.hexdigest()

    def file_exists_in_s3(self, s3_key, local_md5):
        """Check if same file already exists in S3 (skip if unchanged)."""
        try:
            response = self.s3.head_object(Bucket=self.bucket, Key=s3_key)
            s3_etag = response.get('ETag', '').strip('"')
            return s3_etag == local_md5
        except ClientError as e:
            if e.response['Error']['Code'] == '404':
                return False
            raise

    def upload_file(self, local_path, s3_key):
        """Upload a single file with KMS encryption."""
        local_md5 = self.get_file_md5(local_path)

        if self.file_exists_in_s3(s3_key, local_md5):
            print(f"  [SKIP] {s3_key} (unchanged)")
            return 'skipped'

        self.s3.upload_file(
            Filename=str(local_path),
            Bucket=self.bucket,
            Key=s3_key,
            ExtraArgs={
                'ServerSideEncryption': 'aws:kms',
                'SSEKMSKeyId': self.kms_key_id,
                'Metadata': {'md5checksum': local_md5}
            }
        )
        print(f"  [UPLOAD] {s3_key}")
        return 'uploaded'

    def sync_directory(self, local_dir):
        """Recursively sync a local directory to S3."""
        local_dir = Path(local_dir)
        stats = {'uploaded': 0, 'skipped': 0, 'failed': 0}

        for file_path in local_dir.rglob('*'):
            if file_path.is_file():
                relative = file_path.relative_to(local_dir)
                s3_key = f"{self.prefix}/{relative}".replace('\\', '/')

                try:
                    result = self.upload_file(file_path, s3_key)
                    stats[result] += 1
                    self.manifest.append({
                        'local_path': str(file_path),
                        's3_key': s3_key,
                        'size_bytes': file_path.stat().st_size,
                        'status': result,
                        'timestamp': datetime.utcnow().isoformat()
                    })
                except Exception as e:
                    print(f"  [ERROR] Failed to upload {file_path}: {e}")
                    stats['failed'] += 1

        return stats

    def save_manifest(self, output_path='manifest.json'):
        """Save upload manifest to a JSON file and also to S3."""
        with open(output_path, 'w') as f:
            json.dump(self.manifest, f, indent=2)

        # Upload manifest itself to S3
        manifest_key = f"{self.prefix}/_manifest_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
        self.s3.upload_file(
            Filename=output_path,
            Bucket=self.bucket,
            Key=manifest_key,
            ExtraArgs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': self.kms_key_id}
        )
        print(f"\nManifest saved to s3://{self.bucket}/{manifest_key}")


if __name__ == '__main__':
    syncer = S3Syncer(
        bucket='my-company-data',
        prefix='backups/app-data',
        kms_key_id='arn:aws:kms:us-east-1:123456789012:key/abc-123'
    )
    stats = syncer.sync_directory('/opt/app/data')
    syncer.save_manifest()
    print(f"\nSync complete: {stats}")

Answer:

import boto3
import json
from botocore.exceptions import ClientError


def audit_s3_buckets():
    s3 = boto3.client('s3')
    report = []

    buckets = s3.list_buckets()['Buckets']
    print(f"Auditing {len(buckets)} S3 buckets...\n")

    for bucket in buckets:
        name = bucket['Name']
        issues = []

        # 1. Check public access block settings
        try:
            pab = s3.get_public_access_block(Bucket=name)['PublicAccessBlockConfiguration']
            if not all([
                pab.get('BlockPublicAcls'),
                pab.get('IgnorePublicAcls'),
                pab.get('BlockPublicPolicy'),
                pab.get('RestrictPublicBuckets')
            ]):
                issues.append('PUBLIC_ACCESS_NOT_FULLY_BLOCKED')
        except ClientError as e:
            if 'NoSuchPublicAccessBlockConfiguration' in str(e):
                issues.append('NO_PUBLIC_ACCESS_BLOCK')

        # 2. Check bucket policy for public access
        try:
            policy = json.loads(s3.get_bucket_policy(Bucket=name)['Policy'])
            for stmt in policy.get('Statement', []):
                if stmt.get('Effect') == 'Allow' and stmt.get('Principal') in ('*', {'AWS': '*'}):
                    issues.append('PUBLIC_BUCKET_POLICY')
                    break
        except ClientError as e:
            if 'NoSuchBucketPolicy' not in str(e):
                issues.append(f'POLICY_CHECK_ERROR: {e}')

        # 3. Check versioning
        try:
            versioning = s3.get_bucket_versioning(Bucket=name)
            if versioning.get('Status') != 'Enabled':
                issues.append('VERSIONING_DISABLED')
        except ClientError:
            issues.append('VERSIONING_CHECK_FAILED')

        # 4. Check default encryption
        try:
            enc = s3.get_bucket_encryption(Bucket=name)
            rules = enc['ServerSideEncryptionConfiguration']['Rules']
            algo = rules[0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
            if algo not in ('aws:kms', 'AES256'):
                issues.append('WEAK_ENCRYPTION')
        except ClientError as e:
            if 'ServerSideEncryptionConfigurationNotFoundError' in str(e):
                issues.append('ENCRYPTION_DISABLED')

        # 5. Check logging
        try:
            logging_config = s3.get_bucket_logging(Bucket=name)
            if 'LoggingEnabled' not in logging_config:
                issues.append('ACCESS_LOGGING_DISABLED')
        except ClientError:
            issues.append('LOGGING_CHECK_FAILED')

        report.append({
            'bucket': name,
            'issues': issues,
            'compliant': len(issues) == 0
        })

        status = '✅' if not issues else '❌'
        print(f"{status} {name}: {', '.join(issues) if issues else 'All checks passed'}")

    # Summary
    non_compliant = [r for r in report if not r['compliant']]
    print(f"\n{'='*60}")
    print(f"Total Buckets: {len(report)}")
    print(f"Non-Compliant: {len(non_compliant)}")
    
    return report


if __name__ == '__main__':
    audit_s3_buckets()

Answer:

import boto3
from datetime import datetime, timezone, timedelta


def rotate_old_access_keys(dry_run=True, max_age_days=90):
    iam = boto3.client('iam')
    ses = boto3.client('ses', region_name='us-east-1')
    report = []

    users = []
    paginator = iam.get_paginator('list_users')
    for page in paginator.paginate():
        users.extend(page['Users'])

    threshold = datetime.now(timezone.utc) - timedelta(days=max_age_days)

    for user in users:
        username = user['UserName']
        keys = iam.list_access_keys(UserName=username)['AccessKeyMetadata']

        for key in keys:
            if key['Status'] != 'Active':
                continue

            age = datetime.now(timezone.utc) - key['CreateDate']
            if key['CreateDate'] < threshold:
                action_taken = 'DRY_RUN'

                if not dry_run:
                    # Step 1: Create new key
                    new_key = iam.create_access_key(UserName=username)['AccessKey']

                    # Step 2: Deactivate old key (don't delete immediately)
                    iam.update_access_key(
                        UserName=username,
                        AccessKeyId=key['AccessKeyId'],
                        Status='Inactive'
                    )
                    action_taken = 'ROTATED'

                    # Step 3: Notify user with new key via SES
                    notify_user(ses, username, key['AccessKeyId'], new_key)

                report.append({
                    'username': username,
                    'old_key_id': key['AccessKeyId'],
                    'key_age_days': age.days,
                    'action': action_taken
                })
                print(f"[{action_taken}] User: {username}, Key: {key['AccessKeyId']}, Age: {age.days} days")

    return report


def notify_user(ses_client, username, old_key_id, new_key):
    """Send new credentials securely to the user."""
    # In production: use a secrets manager URL, not raw credentials in email
    ses_client.send_email(
        Source='devops@company.com',
        Destination={'ToAddresses': [f'{username}@company.com']},
        Message={
            'Subject': {'Data': '🔑 Your AWS Access Key Has Been Rotated'},
            'Body': {
                'Text': {
                    'Data': f"""
Hi {username},

Your AWS access key {old_key_id} was older than 90 days and has been rotated.

Your new Access Key ID: {new_key['AccessKeyId']}
Your new Secret: [Stored securely in Secrets Manager at /iam/keys/{username}]

Action Required:
1. Retrieve your new secret from AWS Secrets Manager
2. Update your local ~/.aws/credentials
3. The old key will be deleted in 7 days

Contact devops@company.com if you have issues.
                    """
                }
            }
        }
    )


if __name__ == '__main__':
    # Run as dry_run=True first to preview
    rotate_old_access_keys(dry_run=False, max_age_days=90)

Answer:

import boto3
import json
from collections import defaultdict
from datetime import datetime, timedelta


def generate_least_privilege_policy(role_name, lookback_days=30):
    """
    Analyze CloudTrail logs to find all API calls made by a role
    and generate a least-privilege IAM policy.
    """
    cloudtrail = boto3.client('cloudtrail')
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=lookback_days)

    actions_by_service = defaultdict(set)
    resources_used = defaultdict(set)

    print(f"Analyzing CloudTrail logs for role: {role_name}")
    print(f"Lookback period: {lookback_days} days\n")

    paginator = cloudtrail.get_paginator('lookup_events')
    for page in paginator.paginate(
        LookupAttributes=[{
            'AttributeKey': 'Username',
            'AttributeValue': role_name
        }],
        StartTime=start_time,
        EndTime=end_time
    ):
        for event in page['Events']:
            event_name = event.get('EventName', '')
            event_source = event.get('EventSource', '').replace('.amazonaws.com', '')
            
            if event_source and event_name:
                action = f"{event_source}:{event_name}"
                actions_by_service[event_source].add(action)

                # Try to extract resource ARNs from the event
                try:
                    detail = json.loads(event.get('CloudTrailEvent', '{}'))
                    request_params = detail.get('requestParameters', {})
                    if 'bucketName' in request_params:
                        resources_used[event_source].add(
                            f"arn:aws:s3:::{request_params['bucketName']}"
                        )
                except Exception:
                    pass

    # Build the IAM policy
    policy_statements = []
    for service, actions in actions_by_service.items():
        resources = list(resources_used.get(service, ['*']))
        policy_statements.append({
            'Sid': f"Allow{service.replace('-', '').capitalize()}Actions",
            'Effect': 'Allow',
            'Action': sorted(list(actions)),
            'Resource': resources
        })

    policy = {
        'Version': '2012-10-17',
        'Statement': policy_statements
    }

    print("Generated Least-Privilege Policy:")
    print(json.dumps(policy, indent=2))

    total_actions = sum(len(a) for a in actions_by_service.values())
    print(f"\nTotal unique actions found: {total_actions}")
    print(f"Services accessed: {list(actions_by_service.keys())}")

    return policy


if __name__ == '__main__':
    policy = generate_least_privilege_policy('my-app-role', lookback_days=30)
    
    # Optionally save to file for review
    with open('least_privilege_policy.json', 'w') as f:
        json.dump(policy, f, indent=2)
    print("\nPolicy saved to least_privilege_policy.json")

Answer:

import boto3
import zipfile
import io
import json
import hashlib
import time


class LambdaDeployer:
    def __init__(self, region='us-east-1'):
        self.lambda_client = boto3.client('lambda', region_name=region)
        self.iam = boto3.client('iam', region_name=region)

    def create_deployment_package(self, source_file):
        """Zip a Python file for Lambda deployment."""
        zip_buffer = io.BytesIO()
        with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
            zf.write(source_file, 'lambda_function.py')
        zip_buffer.seek(0)
        return zip_buffer.read()

    def deploy_or_update(self, function_name, source_file, role_arn,
                          env_vars, runtime='python3.12', memory=256, timeout=30):
        """Create or update a Lambda function."""
        zip_bytes = self.create_deployment_package(source_file)
        code_hash = hashlib.sha256(zip_bytes).hexdigest()[:8]

        try:
            # Try to get existing function
            existing = self.lambda_client.get_function(FunctionName=function_name)
            print(f"Updating existing function: {function_name}")

            # Update code
            self.lambda_client.update_function_code(
                FunctionName=function_name,
                ZipFile=zip_bytes,
                Publish=False
            )
            # Wait for update to complete
            self._wait_for_update(function_name)

            # Update configuration
            self.lambda_client.update_function_configuration(
                FunctionName=function_name,
                Environment={'Variables': env_vars},
                MemorySize=memory,
                Timeout=timeout
            )
            self._wait_for_update(function_name)

        except self.lambda_client.exceptions.ResourceNotFoundException:
            print(f"Creating new function: {function_name}")
            self.lambda_client.create_function(
                FunctionName=function_name,
                Runtime=runtime,
                Role=role_arn,
                Handler='lambda_function.lambda_handler',
                Code={'ZipFile': zip_bytes},
                Environment={'Variables': env_vars},
                MemorySize=memory,
                Timeout=timeout,
                Tags={'DeployedBy': 'DeployScript', 'CodeHash': code_hash}
            )
            self._wait_for_active(function_name)

        # Publish a new version
        version_response = self.lambda_client.publish_version(
            FunctionName=function_name,
            Description=f"Deploy {code_hash}"
        )
        version = version_response['Version']
        print(f"Published version: {version}")

        return version

    def create_or_update_alias(self, function_name, alias, version, weights=None):
        """Create/update an alias with optional traffic shifting."""
        config = {
            'FunctionName': function_name,
            'Name': alias,
            'FunctionVersion': version,
            'Description': f"Points to version {version}"
        }

        # Traffic shifting: weights = {'old_version': 0.9} means 90% to old, 10% to new
        if weights:
            config['RoutingConfig'] = {
                'AdditionalVersionWeights': weights
            }

        try:
            self.lambda_client.get_alias(FunctionName=function_name, Name=alias)
            self.lambda_client.update_alias(**config)
            print(f"Updated alias '{alias}' → version {version}")
        except self.lambda_client.exceptions.ResourceNotFoundException:
            self.lambda_client.create_alias(**config)
            print(f"Created alias '{alias}' → version {version}")

    def _wait_for_update(self, function_name, max_wait=60):
        """Wait for function update to complete."""
        for _ in range(max_wait):
            resp = self.lambda_client.get_function_configuration(FunctionName=function_name)
            if resp['LastUpdateStatus'] == 'Successful':
                return
            time.sleep(1)
        raise TimeoutError(f"Lambda update timed out for {function_name}")

    def _wait_for_active(self, function_name, max_wait=60):
        """Wait for new function to become active."""
        for _ in range(max_wait):
            resp = self.lambda_client.get_function_configuration(FunctionName=function_name)
            if resp['State'] == 'Active':
                return
            time.sleep(1)
        raise TimeoutError(f"Lambda activation timed out for {function_name}")


if __name__ == '__main__':
    deployer = LambdaDeployer()

    env_vars = {
        'ENVIRONMENT': 'production',
        'DB_HOST': 'prod-db.cluster.us-east-1.rds.amazonaws.com',
        'LOG_LEVEL': 'INFO'
    }

    version = deployer.deploy_or_update(
        function_name='my-app-processor',
        source_file='lambda_function.py',
        role_arn='arn:aws:iam::123456789012:role/LambdaExecRole',
        env_vars=env_vars
    )

    # Canary: send 10% traffic to new version
    deployer.create_or_update_alias(
        function_name='my-app-processor',
        alias='prod',
        version=version,
        weights={'2': 0.9}  # 90% to version 2, 10% to new version
    )

Answer:

import boto3


def setup_ec2_alarms(sns_topic_arn, region='us-east-1'):
    ec2 = boto3.client('ec2', region_name=region)
    cw = boto3.client('cloudwatch', region_name=region)

    instances = []
    paginator = ec2.get_paginator('describe_instances')
    for page in paginator.paginate(
        Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
    ):
        for r in page['Reservations']:
            instances.extend(r['Instances'])

    print(f"Creating alarms for {len(instances)} running instances...")
    alarms_created = 0

    for instance in instances:
        iid = instance['InstanceId']
        name = next(
            (t['Value'] for t in instance.get('Tags', []) if t['Key'] == 'Name'),
            iid
        )

        alarm_configs = [
            {
                'name': f'{name}-HighCPU',
                'metric': 'CPUUtilization',
                'namespace': 'AWS/EC2',
                'threshold': 80,
                'comparison': 'GreaterThanThreshold',
                'period': 300,
                'eval_periods': 3,
                'description': f'CPU > 80% for 15 min on {name}'
            },
            {
                'name': f'{name}-StatusCheckFailed',
                'metric': 'StatusCheckFailed',
                'namespace': 'AWS/EC2',
                'threshold': 0,
                'comparison': 'GreaterThanThreshold',
                'period': 60,
                'eval_periods': 2,
                'description': f'Status check failed on {name}'
            },
            # Memory & Disk require CloudWatch Agent (custom namespace)
            {
                'name': f'{name}-HighMemory',
                'metric': 'mem_used_percent',
                'namespace': 'CWAgent',
                'threshold': 85,
                'comparison': 'GreaterThanThreshold',
                'period': 300,
                'eval_periods': 3,
                'description': f'Memory > 85% on {name}',
                'extra_dims': [{'Name': 'InstanceId', 'Value': iid}]
            },
            {
                'name': f'{name}-LowDiskSpace',
                'metric': 'disk_used_percent',
                'namespace': 'CWAgent',
                'threshold': 85,
                'comparison': 'GreaterThanThreshold',
                'period': 300,
                'eval_periods': 2,
                'description': f'Disk > 85% on {name}',
                'extra_dims': [
                    {'Name': 'InstanceId', 'Value': iid},
                    {'Name': 'path', 'Value': '/'},
                    {'Name': 'fstype', 'Value': 'xfs'}
                ]
            }
        ]

        for cfg in alarm_configs:
            dimensions = [{'Name': 'InstanceId', 'Value': iid}]
            if 'extra_dims' in cfg:
                dimensions = cfg['extra_dims']

            cw.put_metric_alarm(
                AlarmName=cfg['name'],
                AlarmDescription=cfg['description'],
                MetricName=cfg['metric'],
                Namespace=cfg['namespace'],
                Statistic='Average',
                Dimensions=dimensions,
                Period=cfg['period'],
                EvaluationPeriods=cfg['eval_periods'],
                Threshold=cfg['threshold'],
                ComparisonOperator=cfg['comparison'],
                TreatMissingData='notBreaching',
                AlarmActions=[sns_topic_arn],
                OKActions=[sns_topic_arn]
            )
            alarms_created += 1

    print(f"✅ Created {alarms_created} alarms for {len(instances)} instances")


if __name__ == '__main__':
    setup_ec2_alarms(
        sns_topic_arn='arn:aws:sns:us-east-1:123456789012:ops-alerts'
    )

Answer:

import boto3
import time
from datetime import datetime


class RDSSnapshotManager:
    def __init__(self, region='us-east-1'):
        self.rds = boto3.client('rds', region_name=region)

    def create_pre_deploy_snapshot(self, db_identifier, deploy_version):
        """Create a snapshot before a deployment."""
        timestamp = datetime.utcnow().strftime('%Y%m%d-%H%M%S')
        snapshot_id = f"pre-deploy-{db_identifier}-{deploy_version}-{timestamp}"

        print(f"Creating pre-deployment snapshot: {snapshot_id}")
        response = self.rds.create_db_snapshot(
            DBSnapshotIdentifier=snapshot_id,
            DBInstanceIdentifier=db_identifier,
            Tags=[
            {'Key': 'Type', 'Value': 'pre-deployment'},
            {'Key': 'DeployVersion', 'Value': deploy_version},
            {'Key': 'CreatedAt', 'Value': timestamp}
            ]
        )

        # Wait for snapshot to be available
        waiter = self.rds.get_waiter('db_snapshot_available')
        print("Waiting for snapshot to complete...")
        waiter.wait(DBSnapshotIdentifier=snapshot_id)
        print(f"✅ Snapshot ready: {snapshot_id}")
        return snapshot_id

    def restore_from_snapshot(self, snapshot_id, new_db_identifier, db_subnet_group, vpc_sg_ids):
        """Restore an RDS instance from a snapshot."""
        print(f"Restoring {new_db_identifier} from snapshot {snapshot_id}...")

        # Get original instance class from snapshot
        snapshot = self.rds.describe_db_snapshots(
            DBSnapshotIdentifier=snapshot_id
        )['DBSnapshots'][0]

        self.rds.restore_db_instance_from_db_snapshot(
            DBInstanceIdentifier=new_db_identifier,
            DBSnapshotIdentifier=snapshot_id,
            DBInstanceClass=snapshot['DBInstanceClass'],
            DBSubnetGroupName=db_subnet_group,
            VpcSecurityGroupIds=vpc_sg_ids,
            MultiAZ=True,
            AutoMinorVersionUpgrade=True,
            DeletionProtection=True,
            Tags=[
                {'Key': 'RestoredFrom', 'Value': snapshot_id},
                {'Key': 'RestoredAt', 'Value': datetime.utcnow().isoformat()}
            ]
        )

        print("Waiting for restored instance to be available (this may take 10-20 minutes)...")
        waiter = self.rds.get_waiter('db_instance_available')
        waiter.wait(
            DBInstanceIdentifier=new_db_identifier,
            WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
        )

        endpoint = self.rds.describe_db_instances(
            DBInstanceIdentifier=new_db_identifier
        )['DBInstances'][0]['Endpoint']['Address']

        print(f"✅ Restored instance available at: {endpoint}")
        return endpoint

    def point_in_time_restore(self, source_db, target_db, restore_time,
                               db_subnet_group, vpc_sg_ids):
        """Restore to a specific point in time (RPO capability)."""
        print(f"Restoring {source_db} to point-in-time: {restore_time}")
        
        self.rds.restore_db_instance_to_point_in_time(
            SourceDBInstanceIdentifier=source_db,
            TargetDBInstanceIdentifier=target_db,
            RestoreTime=restore_time,
            DBSubnetGroupName=db_subnet_group,
            VpcSecurityGroupIds=vpc_sg_ids
        )

        waiter = self.rds.get_waiter('db_instance_available')
        waiter.wait(DBInstanceIdentifier=target_db)
        print(f"✅ Point-in-time restore complete: {target_db}")


if __name__ == '__main__':
    manager = RDSSnapshotManager()

    # Before deployment
    snapshot_id = manager.create_pre_deploy_snapshot(
        db_identifier='prod-postgres',
        deploy_version='v2.4.1'
    )
    print(f"Pre-deploy snapshot created: {snapshot_id}")

    # If deployment fails, restore
    # manager.restore_from_snapshot(
    #     snapshot_id=snapshot_id,
    #     new_db_identifier='prod-postgres-rollback',
    #     db_subnet_group='prod-db-subnet-group',
    #     vpc_sg_ids=['sg-0abc1234def56789']
    # )

Answer:

import boto3
from datetime import datetime, timedelta
import json


def analyze_and_recommend_savings(lookback_days=30):
    ce = boto3.client('ce', region_name='us-east-1')
    end = datetime.utcnow().strftime('%Y-%m-%d')
    start = (datetime.utcnow() - timedelta(days=lookback_days)).strftime('%Y-%m-%d')

    print(f"Analyzing EC2 costs from {start} to {end}...\n")

    # 1. Get current EC2 spend by instance type
    cost_response = ce.get_cost_and_usage(
        TimePeriod={'Start': start, 'End': end},
        Granularity='MONTHLY',
        Filter={
            'And': [
                {'Dimensions': {'Key': 'SERVICE', 'Values': ['Amazon Elastic Compute Cloud - Compute']}},
                {'Dimensions': {'Key': 'PURCHASE_TYPE', 'Values': ['On-Demand']}}
            ]
        },
        Metrics=['BlendedCost', 'UsageQuantity'],
        GroupBy=[{'Type': 'DIMENSION', 'Key': 'INSTANCE_TYPE'}]
    )

    instance_costs = {}
    for result in cost_response['ResultsByTime']:
        for group in result['Groups']:
            instance_type = group['Keys'][0]
            cost = float(group['Metrics']['BlendedCost']['Amount'])
            instance_costs[instance_type] = instance_costs.get(instance_type, 0) + cost

    # 2. Get Savings Plan recommendations from AWS
    sp_response = ce.get_savings_plans_purchase_recommendation(
        SavingsPlansType='COMPUTE_SP',
        TermInYears='ONE_YEAR',
        PaymentOption='NO_UPFRONT',
        LookbackPeriodInDays='THIRTY_DAYS'
    )

    recommendations = sp_response.get('SavingsPlansPurchaseRecommendation', {})
    summary = recommendations.get('SavingsPlansPurchaseRecommendationSummary', {})

    # 3. Get RI recommendations
    ri_response = ce.get_reservation_purchase_recommendation(
        Service='Amazon EC2',
        TermInYears='ONE_YEAR',
        PaymentOption='NO_UPFRONT',
        LookbackPeriodInDays='THIRTY_DAYS'
    )

    print("=" * 60)
    print("EC2 ON-DEMAND SPEND BY INSTANCE TYPE (Last 30 Days)")
    print("=" * 60)
    for itype, cost in sorted(instance_costs.items(), key=lambda x: -x[1]):
        print(f"  {itype:<20} ${cost:>10.2f}")

    total_cost = sum(instance_costs.values())
    print(f"\n  {'TOTAL ON-DEMAND':<20} ${total_cost:>10.2f}")

    print("\n" + "=" * 60)
    print("SAVINGS PLAN RECOMMENDATION")
    print("=" * 60)
    if summary:
        print(f"  Estimated Monthly Savings: ${float(summary.get('EstimatedMonthlySavingsAmount', 0)):,.2f}")
        print(f"  Savings Percentage:        {summary.get('EstimatedSavingsPercentage', 'N/A')}%")
        print(f"  Recommended Hourly Commit: ${float(summary.get('HourlyCommitmentToPurchase', 0)):,.4f}")
        print(f"  Current On-Demand Spend:   ${float(summary.get('CurrentOnDemandSpend', 0)):,.2f}")

    print("\n" + "=" * 60)
    print("RESERVED INSTANCE RECOMMENDATIONS (Top 5)")
    print("=" * 60)
    for rec in ri_response.get('Recommendations', [])[:5]:
        details = rec.get('RecommendationDetails', [{}])[0]
        spec = details.get('InstanceDetails', {}).get('EC2InstanceDetails', {})
        print(f"\n  Instance:      {spec.get('InstanceType')} in {spec.get('Region')}")
        print(f"  Recommended:   {details.get('RecommendedNumberOfInstancesToPurchase')} instances")
        print(f"  Monthly Save:  ${float(details.get('EstimatedMonthlySavings', 0)):,.2f}")
        print(f"  Break-even:    {details.get('RecurringStandardMonthlyCost', 'N/A')} months")

    return {
        'total_on_demand_spend': total_cost,
        'savings_plan_summary': summary,
        'instance_breakdown': instance_costs
    }


if __name__ == '__main__':
    analyze_and_recommend_savings()

Answer:

import boto3
import json


SENSITIVE_PORTS = {
    22: 'SSH',
    3389: 'RDP',
    3306: 'MySQL',
    5432: 'PostgreSQL',
    27017: 'MongoDB',
    6379: 'Redis',
    9200: 'Elasticsearch',
    8080: 'HTTP-Alt',
    443: 'HTTPS',
    80: 'HTTP'
}

HIGH_RISK_PORTS = {22, 3389, 3306, 5432, 27017, 6379}  # Never expose these to 0.0.0.0/0


def audit_security_groups(region='us-east-1', auto_remediate=False):
    ec2 = boto3.client('ec2', region_name=region)
    findings = []

    paginator = ec2.get_paginator('describe_security_groups')
    for page in paginator.paginate():
        for sg in page['SecurityGroups']:
            sg_id = sg['GroupId']
            sg_name = sg['GroupName']
            vpc_id = sg.get('VpcId', 'EC2-Classic')

            for rule in sg.get('IpPermissions', []):
                from_port = rule.get('FromPort', 0)
                to_port = rule.get('ToPort', 65535)
                protocol = rule.get('IpProtocol', '-1')

                for cidr_range in rule.get('IpRanges', []):
                    cidr = cidr_range.get('CidrIp', '')
                    if cidr in ('0.0.0.0/0', '::/0'):

                        # Check if any sensitive port is in range
                        for port, service in SENSITIVE_PORTS.items():
                            if protocol == '-1' or (from_port <= port <= to_port):
                                severity = 'CRITICAL' if port in HIGH_RISK_PORTS else 'HIGH'
                                finding = {
                                    'sg_id': sg_id,
                                    'sg_name': sg_name,
                                    'vpc_id': vpc_id,
                                    'port': port,
                                    'service': service,
                                    'protocol': protocol,
                                    'cidr': cidr,
                                    'severity': severity
                                }
                                findings.append(finding)

                                icon = '🔴' if severity == 'CRITICAL' else '🟠'
                                print(f"{icon} [{severity}] {sg_id} ({sg_name}) "
                                      f"allows {service} ({port}) from {cidr}")

                                # Auto-remediate critical findings
                                if auto_remediate and port in HIGH_RISK_PORTS:
                                    revoke_rule(ec2, sg_id, rule, cidr)

    print(f"\n{'='*60}")
    print(f"Total findings: {len(findings)}")
    critical = [f for f in findings if f['severity'] == 'CRITICAL']
    print(f"Critical (high-risk ports exposed): {len(critical)}")

    return findings


def revoke_rule(ec2_client, sg_id, rule, cidr):
    """Remove a specific CIDR from an inbound rule."""
    try:
        ec2_client.revoke_security_group_ingress(
            GroupId=sg_id,
            IpPermissions=[{
                'IpProtocol': rule['IpProtocol'],
                'FromPort': rule.get('FromPort', 0),
                'ToPort': rule.get('ToPort', 65535),
                'IpRanges': [{'CidrIp': cidr}]
            }]
        )
        print(f"  ✅ Revoked {cidr} from {sg_id}")
    except Exception as e:
        print(f"  ❌ Failed to revoke: {e}")


if __name__ == '__main__':
    # Set auto_remediate=True to automatically fix CRITICAL issues
    findings = audit_security_groups(auto_remediate=False)
    
    with open('sg_audit_report.json', 'w') as f:
        json.dump(findings, f, indent=2)
    print("\nFull report saved to sg_audit_report.json")

Answer:

import boto3
import time
import sys


class ECSRollingDeployer:
    def __init__(self, cluster, service, region='us-east-1'):
        self.ecs = boto3.client('ecs', region_name=region)
        self.cluster = cluster
        self.service = service

    def get_current_task_definition(self):
        response = self.ecs.describe_services(
            cluster=self.cluster,
            services=[self.service]
        )
        return response['services'][0]['taskDefinition']

    def create_new_task_definition(self, current_td_arn, new_image):
        """Register a new task definition with updated container image."""
        td = self.ecs.describe_task_definition(taskDefinition=current_td_arn)['taskDefinition']

        # Update the image in the first container definition
        containers = td['containerDefinitions']
        containers[0]['image'] = new_image

        # Strip non-required fields
        for field in ['taskDefinitionArn', 'revision', 'status',
                      'requiresAttributes', 'compatibilities', 'registeredAt', 'registeredBy']:
            td.pop(field, None)

        response = self.ecs.register_task_definition(**td)
        new_td_arn = response['taskDefinition']['taskDefinitionArn']
        print(f"Registered new task definition: {new_td_arn}")
        return new_td_arn

    def deploy(self, new_image, health_check_retries=10, health_check_delay=30):
        """Deploy new image with automatic rollback on failure."""
        print(f"Starting deployment of {new_image} to {self.service}...")

        current_td = self.get_current_task_definition()
        print(f"Current task definition: {current_td}")

        new_td = self.create_new_task_definition(current_td, new_image)

        # Update the service
        self.ecs.update_service(
            cluster=self.cluster,
            service=self.service,
            taskDefinition=new_td,
            deploymentConfiguration={
                'minimumHealthyPercent': 100,
                'maximumPercent': 200
            },
            forceNewDeployment=True
        )
        print("Service update triggered. Monitoring health...")

        # Monitor deployment
        for attempt in range(health_check_retries):
            time.sleep(health_check_delay)
            status = self.get_deployment_status()
            print(f"[{attempt+1}/{health_check_retries}] Deployment status: {status}")

            if status == 'HEALTHY':
                print(f"✅ Deployment successful! Running on {new_td}")
                return True
            elif status == 'FAILED':
                print("❌ Deployment failed! Initiating rollback...")
                self.rollback(current_td)
                return False

        print("⏰ Health checks timed out. Initiating rollback...")
        self.rollback(current_td)
        return False

    def get_deployment_status(self):
        """Check if the new deployment is healthy."""
        response = self.ecs.describe_services(
            cluster=self.cluster,
            services=[self.service]
        )
        service = response['services'][0]
        deployments = service['deployments']

        primary = next((d for d in deployments if d['status'] == 'PRIMARY'), None)
        if not primary:
            return 'UNKNOWN'

        desired = primary['desiredCount']
        running = primary['runningCount']
        failed = primary['failedTasks']

        if failed > 0:
            return 'FAILED'
        if running == desired:
            return 'HEALTHY'
        return f'IN_PROGRESS ({running}/{desired} running)'

    def rollback(self, previous_td_arn):
        """Roll back to the previous task definition."""
        self.ecs.update_service(
            cluster=self.cluster,
            service=self.service,
            taskDefinition=previous_td_arn
        )
        print(f"⏪ Rollback complete. Reverted to: {previous_td_arn}")


if __name__ == '__main__':
    deployer = ECSRollingDeployer(
        cluster='prod-cluster',
        service='payment-service'
    )
    success = deployer.deploy(
        new_image='123456789012.dkr.ecr.us-east-1.amazonaws.com/payment:v2.4.1'
    )
    sys.exit(0 if success else 1)

Answer:

import boto3
import json
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)


def lambda_handler(event, context):
    """
    Triggered by EventBridge rule:
    { "detail-type": ["AWS API Call via CloudTrail"],
      "detail": { "userIdentity": { "type": ["Root"] } } }
    """
    sns = boto3.client('sns')
    iam = boto3.client('iam')

    detail = event.get('detail', {})
    event_name = detail.get('eventName', 'Unknown')
    source_ip = detail.get('sourceIPAddress', 'Unknown')
    user_agent = detail.get('userAgent', 'Unknown')
    event_time = detail.get('eventTime', 'Unknown')
    region = detail.get('awsRegion', 'Unknown')

    # Skip if this is from an authorized automated process
    authorized_ips = ['10.0.0.0/8']  # Internal automation IPs
    if any(source_ip.startswith(ip.split('/')[0][:5]) for ip in authorized_ips):
        logger.info(f"Root API call from authorized IP {source_ip}, skipping alert.")
        return

    alert_message = f"""
🚨 ROOT ACCOUNT ACTIVITY DETECTED 🚨

Time:       {event_time}
Event:      {event_name}
Region:     {region}
Source IP:  {source_ip}
User Agent: {user_agent}

IMMEDIATE ACTION REQUIRED:
1. Verify if this was authorized
2. If unauthorized, rotate root credentials immediately
3. Review CloudTrail for all recent root actions
4. Check for any IAM users/roles created by root

CloudTrail Event Detail:
{json.dumps(detail, indent=2, default=str)}
    """

    # Send high-priority SNS alert
    sns.publish(
        TopicArn='arn:aws:sns:us-east-1:123456789012:security-critical-alerts',
        Subject='🚨 CRITICAL: AWS Root Account Activity Detected',
        Message=alert_message,
        MessageAttributes={
            'severity': {
                'DataType': 'String',
                'StringValue': 'CRITICAL'
            }
        }
    )

    # Log to security SIEM (example: send to CloudWatch Logs with special prefix)
    logger.critical(f"ROOT_ACCOUNT_USAGE | {event_name} | {source_ip} | {event_time}")

    # Optional: If this is an unknown IP, create a deny-all policy as emergency measure
    if source_ip not in ['known-admin-ip']:
        logger.warning("Applying emergency root account restrictions...")
        # Note: You cannot restrict root via IAM policy, but you can:
        # 1. Alert the security team
        # 2. Trigger a Step Functions workflow for incident response
        # 3. Isolate any resources created by root in this session

    return {
        'statusCode': 200,
        'message': 'Root account alert sent successfully'
    }

Answer:

import boto3
import time
import json
import urllib.request


class CodePipelineMonitor:
    def __init__(self, pipeline_name, slack_webhook_url, region='us-east-1'):
        self.cp = boto3.client('codepipeline', region_name=region)
        self.pipeline_name = pipeline_name
        self.slack_url = slack_webhook_url

    def trigger_pipeline(self):
        """Start the pipeline execution."""
        response = self.cp.start_pipeline_execution(name=self.pipeline_name)
        execution_id = response['pipelineExecutionId']
        print(f"Pipeline triggered. Execution ID: {execution_id}")
        self.send_slack(
            f"🚀 *Deployment Started*\nPipeline: `{self.pipeline_name}`\nExecution: `{execution_id}`",
            color='#439FE0'
        )
        return execution_id

    def monitor(self, execution_id, poll_interval=15, timeout=1800):
        """Poll pipeline status and notify on stage changes."""
        print(f"Monitoring pipeline execution: {execution_id}")
        stage_statuses = {}
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(poll_interval)

            try:
                response = self.cp.get_pipeline_execution(
                    pipelineName=self.pipeline_name,
                    pipelineExecutionId=execution_id
                )
                status = response['pipelineExecution']['status']

                # Get stage details
                state = self.cp.get_pipeline_state(name=self.pipeline_name)
                for stage in state['stageStates']:
                    stage_name = stage['stageName']
                    stage_status = stage.get('latestExecution', {}).get('status', 'UNKNOWN')

                    # Notify only on status change
                    if stage_statuses.get(stage_name) != stage_status:
                        stage_statuses[stage_name] = stage_status
                        self.notify_stage_change(stage_name, stage_status)

                if status in ('Succeeded', 'Failed', 'Stopped', 'Superseded'):
                    self.notify_final_status(status, execution_id)
                    return status == 'Succeeded'

            except Exception as e:
                print(f"Error polling pipeline: {e}")

        self.send_slack(f"⏰ Pipeline monitoring timed out after {timeout//60} minutes", color='#FFA500')
        return False

    def notify_stage_change(self, stage_name, status):
        """Send Slack notification for stage status change."""
        emoji_map = {
            'InProgress': '🔄',
            'Succeeded': '✅',
            'Failed': '❌',
            'Stopped': '⛔',
            'Skipped': '⏭️'
        }
        color_map = {
            'InProgress': '#439FE0',
            'Succeeded': '#36a64f',
            'Failed': '#D00000',
            'Stopped': '#FFA500'
        }
        emoji = emoji_map.get(status, '❓')
        color = color_map.get(status, '#808080')
        self.send_slack(
            f"{emoji} Stage *{stage_name}*: `{status}`",
            color=color
        )

    def notify_final_status(self, status, execution_id):
        """Final pipeline result notification."""
        if status == 'Succeeded':
            msg = f"✅ *Deployment Succeeded!*\nPipeline: `{self.pipeline_name}`\nExecution: `{execution_id}`"
            color = '#36a64f'
        else:
            msg = f"❌ *Deployment FAILED!*\nPipeline: `{self.pipeline_name}`\nExecution: `{execution_id}`\nStatus: `{status}`"
            color = '#D00000'
        self.send_slack(msg, color=color)

    def send_slack(self, message, color='#439FE0'):
        """Send message to Slack via webhook."""
        payload = {
            'attachments': [{
                'color': color,
                'text': message,
                'footer': 'AWS CodePipeline Monitor',
                'ts': int(time.time())
            }]
        }
        data = json.dumps(payload).encode('utf-8')
        req = urllib.request.Request(
            self.slack_url,
            data=data,
            headers={'Content-Type': 'application/json'}
        )
        try:
            urllib.request.urlopen(req)
        except Exception as e:
            print(f"Slack notification failed: {e}")


if __name__ == '__main__':
    monitor = CodePipelineMonitor(
        pipeline_name='prod-app-pipeline',
        slack_webhook_url='https://hooks.slack.com/services/T00/B00/xxx'
    )
    execution_id = monitor.trigger_pipeline()
    success = monitor.monitor(execution_id)
    exit(0 if success else 1)

Answer:

import boto3
import json
from concurrent.futures import ThreadPoolExecutor, as_completed


def get_all_org_accounts():
    """Get all active accounts in the organization."""
    org = boto3.client('organizations')
    accounts = []
    paginator = org.get_paginator('list_accounts')
    for page in paginator.paginate():
        accounts.extend([
            a for a in page['Accounts'] if a['Status'] == 'ACTIVE'
        ])
    return accounts


def assume_role(account_id, role_name='SecurityAuditRole'):
    """Assume a role in a target account and return credentials."""
    sts = boto3.client('sts')
    role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
    try:
        response = sts.assume_role(
            RoleArn=role_arn,
            RoleSessionName='SecurityAuditSession',
            DurationSeconds=3600
        )
        creds = response['Credentials']
        return {
            'aws_access_key_id': creds['AccessKeyId'],
            'aws_secret_access_key': creds['SecretAccessKey'],
            'aws_session_token': creds['SessionToken']
        }
    except Exception as e:
        print(f"  ❌ Cannot assume role in {account_id}: {e}")
        return None


def audit_account(account):
    """Run security checks on a single account."""
    account_id = account['Id']
    account_name = account['Name']
    findings = {'account_id': account_id, 'account_name': account_name, 'checks': {}}

    creds = assume_role(account_id)
    if not creds:
        findings['error'] = 'Cannot assume role'
        return findings

    try:
        # Check 1: Root MFA enabled
        iam = boto3.client('iam', **creds)
        summary = iam.get_account_summary()['SummaryMap']
        findings['checks']['root_mfa_enabled'] = bool(summary.get('AccountMFAEnabled', 0))

        # Check 2: CloudTrail enabled in all regions
        ct = boto3.client('cloudtrail', **creds)
        trails = ct.describe_trails(includeShadowTrails=False)['trailList']
        multi_region = any(t.get('IsMultiRegionTrail') for t in trails)
        findings['checks']['cloudtrail_multi_region'] = multi_region

        # Check 3: GuardDuty enabled
        gd = boto3.client('guardduty', **creds)
        detectors = gd.list_detectors()['DetectorIds']
        findings['checks']['guardduty_enabled'] = len(detectors) > 0

        # Check 4: AWS Config enabled
        config = boto3.client('config', **creds)
        recorders = config.describe_configuration_recorders()['ConfigurationRecorders']
        findings['checks']['aws_config_enabled'] = len(recorders) > 0

        # Check 5: Password policy
        try:
            policy = iam.get_account_password_policy()['PasswordPolicy']
            findings['checks']['password_min_length'] = policy.get('MinimumPasswordLength', 0)
            findings['checks']['password_requires_mfa'] = policy.get('RequireNumbers', False)
        except iam.exceptions.NoSuchEntityException:
            findings['checks']['password_policy'] = 'NOT_SET'

    except Exception as e:
        findings['error'] = str(e)

    return findings


def generate_consolidated_report():
    """Audit all org accounts in parallel and generate a report."""
    accounts = get_all_org_accounts()
    print(f"Auditing {len(accounts)} accounts...\n")

    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(audit_account, acct): acct for acct in accounts}
        for future in as_completed(futures):
            result = future.result()
            results.append(result)
            status = '✅' if not result.get('error') else '❌'
            print(f"{status} {result['account_id']} ({result['account_name']})")

    # Summary
    print("\n" + "=" * 70)
    print("CONSOLIDATED SECURITY REPORT")
    print("=" * 70)
    for r in sorted(results, key=lambda x: x['account_name']):
        print(f"\n  Account: {r['account_name']} ({r['account_id']})")
        for check, val in r.get('checks', {}).items():
            icon = '✅' if val else '❌'
            print(f"    {icon} {check}: {val}")

    with open('org_security_report.json', 'w') as f:
        json.dump(results, f, indent=2)
    print("\nFull report saved to org_security_report.json")


if __name__ == '__main__':
    generate_consolidated_report()

Answer:

import boto3
from typing import Generator, Any


def paginate_all(client, method_name: str, result_key: str, **kwargs) -> Generator[Any, None, None]:
    """
    Generic paginator for any boto3 API call that supports pagination.
    
    Usage:
        for instance in paginate_all(ec2, 'describe_instances', 'Reservations', Filters=[...]):
            print(instance)
    """
    method = getattr(client, method_name)

    # Try to use the built-in paginator first (most efficient)
    try:
        paginator = client.get_paginator(method_name)
        for page in paginator.paginate(**kwargs):
            yield from page.get(result_key, [])
    except Exception:
        # Fall back to manual NextToken pagination
        while True:
            response = method(**kwargs)
            yield from response.get(result_key, [])
            next_token = response.get('NextToken') or response.get('Marker')
            if not next_token:
                break
            kwargs['NextToken'] = next_token


# Examples
if __name__ == '__main__':
    ec2 = boto3.client('ec2')
    s3 = boto3.client('s3')
    iam = boto3.client('iam')

    # List all EC2 instances
    instances = list(paginate_all(ec2, 'describe_instances', 'Reservations'))
    print(f"Total reservation pages fetched: {len(instances)}")

    # List all S3 buckets (no pagination needed but works the same)
    buckets = list(paginate_all(s3, 'list_buckets', 'Buckets'))
    print(f"Total buckets: {len(buckets)}")

    # List all IAM users
    users = list(paginate_all(iam, 'list_users', 'Users'))
    print(f"Total IAM users: {len(users)}")

Answer:

import boto3
import time
import random
import functools
import logging
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)


def aws_retry(max_retries=5, base_delay=0.5, max_delay=30, exceptions=None):
    """
    Decorator for retrying AWS API calls with exponential backoff + jitter.
    Handles ThrottlingException, RequestLimitExceeded, and other transient errors.
    """
    if exceptions is None:
        exceptions = {
            'ThrottlingException',
            'RequestLimitExceeded',
            'TooManyRequestsException',
            'ServiceUnavailable',
            'InternalServerError',
            'RequestTimeout',
            'ProvisionedThroughputExceededException',
            'LimitExceededException'
        }

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            for attempt in range(max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except ClientError as e:
                    error_code = e.response['Error']['Code']
                    if error_code not in exceptions:
                        raise  # Non-retryable error

                    last_exception = e
                    if attempt == max_retries:
                        logger.error(f"Max retries ({max_retries}) exceeded for {func.__name__}")
                        raise

                    # Exponential backoff with jitter
                    delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
                    logger.warning(
                        f"[{func.__name__}] Attempt {attempt+1}/{max_retries} failed "
                        f"({error_code}). Retrying in {delay:.2f}s..."
                    )
                    time.sleep(delay)

            raise last_exception
        return wrapper
    return decorator


# Usage example
@aws_retry(max_retries=5)
def list_all_buckets():
    s3 = boto3.client('s3')
    return s3.list_buckets()['Buckets']


@aws_retry(max_retries=3, base_delay=1)
def describe_instances(filters):
    ec2 = boto3.client('ec2')
    return ec2.describe_instances(Filters=filters)


# Class-based usage with context manager
class AWSClientWithRetry:
    """Wrapper that automatically retries all methods of a boto3 client."""

    def __init__(self, service, region='us-east-1', max_retries=5):
        self._client = boto3.client(service, region_name=region)
        self._max_retries = max_retries

    def __getattr__(self, name):
        method = getattr(self._client, name)
        if callable(method):
            return aws_retry(max_retries=self._max_retries)(method)
        return method


# Use like a normal client with automatic retries
if __name__ == '__main__':
    ec2 = AWSClientWithRetry('ec2', max_retries=5)
    instances = ec2.describe_instances()  # Automatically retried on throttle

Answer:

import boto3
import logging
import os
import sys
import json
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError
from typing import Optional


# ─── 1. Structured Logging ───────────────────────────────────────────────────
def setup_logger(name: str) -> logging.Logger:
    logger = logging.getLogger(name)
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter(
        '{"time": "%(asctime)s", "level": "%(levelname)s", '
        '"logger": "%(name)s", "message": "%(message)s"}'
    ))
    logger.addHandler(handler)
    logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))
    return logger

logger = setup_logger('aws-automation')


# ─── 2. Config with Timeouts & Retries ───────────────────────────────────────
boto_config = Config(
    retries={'max_attempts': 5, 'mode': 'adaptive'},
    connect_timeout=5,
    read_timeout=30,
    max_pool_connections=10
)


# ─── 3. Credential Management — NEVER hardcode ───────────────────────────────
def get_client(service: str, region: Optional[str] = None, role_arn: Optional[str] = None):
    """
    Get a boto3 client. Optionally assume a role.
    Uses IAM role (Lambda/EC2) or ~/.aws/credentials automatically.
    """
    region = region or os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')

    if role_arn:
        sts = boto3.client('sts', config=boto_config)
        assumed = sts.assume_role(
            RoleArn=role_arn,
            RoleSessionName='AutomationSession'
        )['Credentials']
        return boto3.client(
            service,
            region_name=region,
            aws_access_key_id=assumed['AccessKeyId'],
            aws_secret_access_key=assumed['SecretAccessKey'],
            aws_session_token=assumed['SessionToken'],
            config=boto_config
        )

    return boto3.client(service, region_name=region, config=boto_config)


# ─── 4. Secrets Retrieval — from Secrets Manager ─────────────────────────────
def get_secret(secret_name: str, region: str = 'us-east-1') -> dict:
    """Retrieve a JSON secret from AWS Secrets Manager."""
    client = get_client('secretsmanager', region)
    try:
        response = client.get_secret_value(SecretId=secret_name)
        return json.loads(response['SecretString'])
    except ClientError as e:
        logger.error(f"Failed to retrieve secret {secret_name}: {e}")
        raise


# ─── 5. Idempotent Operations ─────────────────────────────────────────────────
def create_s3_bucket_idempotent(bucket_name: str, region: str = 'us-east-1') -> bool:
    """Create S3 bucket only if it doesn't exist (idempotent)."""
    s3 = get_client('s3', region)
    try:
        s3.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': region} if region != 'us-east-1' else {}
        )
        logger.info(f"Created bucket: {bucket_name}")
        return True
    except ClientError as e:
        if e.response['Error']['Code'] in ('BucketAlreadyOwnedByYou', 'BucketAlreadyExists'):
            logger.info(f"Bucket already exists: {bucket_name}")
            return True
        logger.error(f"Failed to create bucket {bucket_name}: {e}")
        raise


# ─── 6. Environment-Aware Configuration ──────────────────────────────────────
class Config:
    ENV = os.environ.get('ENVIRONMENT', 'dev')
    REGION = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
    SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN', '')
    DRY_RUN = os.environ.get('DRY_RUN', 'false').lower() == 'true'

    @classmethod
    def validate(cls):
        required = ['SNS_TOPIC_ARN']
        missing = [k for k in required if not getattr(cls, k)]
        if missing:
            raise EnvironmentError(f"Missing required env vars: {missing}")


# ─── 7. Graceful Main Function ────────────────────────────────────────────────
def main():
    try:
        Config.validate()
        logger.info(f"Starting automation in {Config.ENV} | dry_run={Config.DRY_RUN}")

        # Your logic here
        bucket_name = f"my-app-{Config.ENV}-data"
        create_s3_bucket_idempotent(bucket_name, Config.REGION)

        logger.info("Automation completed successfully")
        return 0

    except EnvironmentError as e:
        logger.critical(f"Configuration error: {e}")
        return 1
    except EndpointConnectionError as e:
        logger.critical(f"AWS connectivity error (check network/proxy): {e}")
        return 2
    except ClientError as e:
        logger.critical(f"AWS API error: {e.response['Error']}")
        return 3
    except Exception as e:
        logger.critical(f"Unexpected error: {e}", exc_info=True)
        return 99


if __name__ == '__main__':
    sys.exit(main())

Answer:

import boto3
from datetime import datetime, timezone, timedelta
import json


class AWSResourceCleaner:
    def __init__(self, region='us-east-1', dry_run=True, age_days=30):
        self.ec2 = boto3.client('ec2', region_name=region)
        self.dry_run = dry_run
        self.age_days = age_days
        self.report = {'dry_run': dry_run, 'resources': [], 'estimated_savings': 0}

    def clean_unattached_ebs_volumes(self):
        """Find and delete EBS volumes in 'available' state."""
        volumes = self.ec2.describe_volumes(
            Filters=[{'Name': 'status', 'Values': ['available']}]
        )['Volumes']

        ebs_prices = {'gp3': 0.08, 'gp2': 0.10, 'io1': 0.125, 'st1': 0.045, 'sc1': 0.025}

        for vol in volumes:
            vol_id = vol['VolumeId']
            size_gb = vol['Size']
            vol_type = vol['VolumeType']
            monthly_cost = size_gb * ebs_prices.get(vol_type, 0.10)

            entry = {
                'type': 'EBS_Volume',
                'id': vol_id,
                'size_gb': size_gb,
                'volume_type': vol_type,
                'monthly_cost_usd': round(monthly_cost, 2),
                'action': 'DRY_RUN' if self.dry_run else 'DELETED'
            }

            if not self.dry_run:
                self.ec2.delete_volume(VolumeId=vol_id)
                print(f"  🗑️ Deleted EBS volume: {vol_id} ({size_gb}GB {vol_type}) ~ ${monthly_cost:.2f}/mo")
            else:
                print(f"  [DRY-RUN] Would delete EBS: {vol_id} ({size_gb}GB {vol_type}) ~ ${monthly_cost:.2f}/mo")

            self.report['resources'].append(entry)
            self.report['estimated_savings'] += monthly_cost

    def clean_unassociated_eips(self):
        """Release Elastic IPs not associated with any instance."""
        eips = self.ec2.describe_addresses()['Addresses']
        eip_monthly_cost = 3.65  # ~$0.005/hour for unused EIPs

        for eip in eips:
            if 'AssociationId' not in eip:
                alloc_id = eip.get('AllocationId')
                public_ip = eip.get('PublicIp')

                entry = {
                    'type': 'Elastic_IP',
                    'id': alloc_id,
                    'public_ip': public_ip,
                    'monthly_cost_usd': eip_monthly_cost,
                    'action': 'DRY_RUN' if self.dry_run else 'RELEASED'
                }

                if not self.dry_run and alloc_id:
                    self.ec2.release_address(AllocationId=alloc_id)
                    print(f"  🗑️ Released EIP: {public_ip} ~ ${eip_monthly_cost:.2f}/mo")
                else:
                    print(f"  [DRY-RUN] Would release EIP: {public_ip} ~ ${eip_monthly_cost:.2f}/mo")

                self.report['resources'].append(entry)
                self.report['estimated_savings'] += eip_monthly_cost

    def clean_old_amis(self):
        """Deregister AMIs older than age_days (keep the 3 most recent per name prefix)."""
        cutoff = datetime.now(timezone.utc) - timedelta(days=self.age_days)

        amis = self.ec2.describe_images(
            Owners=['self'],
            Filters=[{'Name': 'state', 'Values': ['available']}]
        )['Images']

        # Group by name prefix (e.g., 'app-server-*')
        from collections import defaultdict
        grouped = defaultdict(list)
        for ami in amis:
            prefix = '-'.join(ami.get('Name', 'unknown').split('-')[:-1]) or 'unknown'
            grouped[prefix].append(ami)

        for prefix, ami_list in grouped.items():
            sorted_amis = sorted(ami_list, key=lambda x: x['CreationDate'], reverse=True)
            to_delete = sorted_amis[3:]  # Keep 3 most recent

            for ami in to_delete:
                creation = datetime.fromisoformat(ami['CreationDate'].replace('Z', '+00:00'))
                if creation < cutoff:
                    ami_id = ami['ImageId']
                    entry = {
                        'type': 'AMI',
                        'id': ami_id,
                        'name': ami.get('Name'),
                        'created': ami['CreationDate'],
                        'monthly_cost_usd': 0.5,
                        'action': 'DRY_RUN' if self.dry_run else 'DEREGISTERED'
                    }

                    if not self.dry_run:
                        self.ec2.deregister_image(ImageId=ami_id)
                        print(f"  🗑️ Deregistered AMI: {ami_id} ({ami.get('Name')})")
                    else:
                        print(f"  [DRY-RUN] Would deregister AMI: {ami_id} ({ami.get('Name')})")

                    self.report['resources'].append(entry)
                    self.report['estimated_savings'] += 0.5

    def run(self):
        print(f"\n{'='*60}")
        print(f"AWS Resource Cleanup ({'DRY RUN' if self.dry_run else 'LIVE'})")
        print(f"{'='*60}\n")

        print("🔍 Scanning EBS Volumes...")
        self.clean_unattached_ebs_volumes()

        print("\n🔍 Scanning Elastic IPs...")
        self.clean_unassociated_eips()

        print("\n🔍 Scanning Old AMIs...")
        self.clean_old_amis()

        # Final Report
        print(f"\n{'='*60}")
        print("💰 CLEANUP SUMMARY")
        print(f"{'='*60}")
        print(f"Resources identified: {len(self.report['resources'])}")
        print(f"Estimated monthly savings: ${self.report['estimated_savings']:.2f}")
        print(f"Estimated annual savings:  ${self.report['estimated_savings'] * 12:.2f}")

        with open('cleanup_report.json', 'w') as f:
            json.dump(self.report, f, indent=2)
        print("\nDetailed report saved to cleanup_report.json")

        return self.report


if __name__ == '__main__':
    # Always run dry_run=True first!
    cleaner = AWSResourceCleaner(dry_run=True, age_days=30)
    cleaner.run()

💡 Quick Reference: boto3 Cheat Sheet

# ─── Session & Client Setup ───────────────────────────────────────────────────
import boto3

# Default credentials (from env, ~/.aws, or IAM role)
client = boto3.client('ec2', region_name='us-east-1')

# Assume role
sts = boto3.client('sts')
creds = sts.assume_role(RoleArn='arn:aws:iam::123:role/MyRole',
                         RoleSessionName='session')['Credentials']
assumed_client = boto3.client('s3',
    aws_access_key_id=creds['AccessKeyId'],
    aws_secret_access_key=creds['SecretAccessKey'],
    aws_session_token=creds['SessionToken'])

# ─── Pagination ───────────────────────────────────────────────────────────────
paginator = client.get_paginator('describe_instances')
for page in paginator.paginate():
    for reservation in page['Reservations']:
        ...

# ─── Waiters ──────────────────────────────────────────────────────────────────
waiter = client.get_waiter('instance_running')
waiter.wait(InstanceIds=['i-1234567890abcdef0'])

# ─── Resource API (higher-level) ─────────────────────────────────────────────
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')
for obj in bucket.objects.all():
    print(obj.key)

# ─── Error Handling ───────────────────────────────────────────────────────────
from botocore.exceptions import ClientError
try:
    client.some_api_call()
except ClientError as e:
    code = e.response['Error']['Code']
    message = e.response['Error']['Message']
    if code == 'NoSuchBucket':
        ...

# ─── Common Services Quick Reference ─────────────────────────────────────────
# EC2      → boto3.client('ec2')
# S3       → boto3.client('s3') or boto3.resource('s3')
# IAM      → boto3.client('iam')
# Lambda   → boto3.client('lambda')
# RDS      → boto3.client('rds')
# DynamoDB → boto3.client('dynamodb') or boto3.resource('dynamodb')
# SQS      → boto3.client('sqs') or boto3.resource('sqs')
# SNS      → boto3.client('sns')
# ECS      → boto3.client('ecs')
# CloudWatch → boto3.client('cloudwatch') or boto3.client('logs')
# Secrets  → boto3.client('secretsmanager')
# SSM      → boto3.client('ssm')
# STS      → boto3.client('sts')
# Org      → boto3.client('organizations')
# CE       → boto3.client('ce')  # Cost Explorer

📚 Python AWS Resources

ResourcePurpose
boto3 DocsOfficial boto3 reference
AWS SDK Examples (GitHub)Official code examples
LocalStackTest AWS locally
motoMock AWS services for unit testing
AWS CDK (Python)IaC with Python
Powertools for LambdaProduction Lambda utilities

🏆 Key Interview Tips

  1. Always mention pagination — Interviewers love seeing you know about get_paginator().
  2. Discuss error handling — Show ClientError handling and exponential backoff.
  3. Think about idempotency — Scripts should be safe to run multiple times.
  4. Mention dry-run mode — Critical for destructive operations.
  5. Security first — Never hardcode credentials; always use IAM roles or Secrets Manager.
  6. Discuss testing — Mention moto for mocking AWS in unit tests.
  7. Logging over print — Use structured logging with logging module.
  8. Environment variables — Configuration should come from env vars, not hardcoded.

Last updated: 2025 | Covers Python 3.10+ and boto3 latest | Questions sourced from interviews at Amazon, Atlassian, Datadog, HashiCorp, and Cloudflare.

⭐ Pair this with the AWS Scenario-Based Interview Q&A README for complete preparation!

Add More Questions to This Guide

Know questions that should be here? Share them and help the community!

Open Google Form