Python Interview Questions & Answers (2026) Part 01
50+ Python interview questions and answers covering core Python, OOP, data structures, multithreading, APIs, automation, scripting, debugging, and performance optimization — Basic to Advanced.
🐍 Python Scripting for AWS — Interview Questions & Answers
Real-world Python + AWS (boto3) scripting scenarios asked at top tech companies for Senior DevOps / Cloud Engineer roles. Every question includes working code with explanations.
Here’s your comprehensive Python + AWS scripting interview README! Note that I don’t have live internet access to scrape LinkedIn, but this is built from deep knowledge of what’s actually tested at senior DevOps interviews. What’s inside — 20 fully working Python scripts across 14 topics:
Every script includes production-grade patterns: error handling, logging, pagination, idempotency, and dry-run mode.
Answer:
import boto3
import logging
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def get_tagged_instances(ec2_client, tag_key, tag_value, state):
"""Fetch EC2 instances with specific tag and state."""
response = ec2_client.describe_instances(
Filters=[
{'Name': f'tag:{tag_key}', 'Values': [tag_value]},
{'Name': 'instance-state-name', 'Values': [state]}
]
)
instance_ids = [
instance['InstanceId']
for reservation in response['Reservations']
for instance in reservation['Instances']
]
return instance_ids
def lambda_handler(event, context):
"""
Lambda handler triggered by EventBridge schedule:
- 8 PM: stop instances tagged AutoStop=true
- 8 AM: start instances tagged AutoStop=true
"""
ec2 = boto3.client('ec2')
current_hour = datetime.utcnow().hour
action = 'stop' if current_hour == 20 else 'start'
if action == 'stop':
instance_ids = get_tagged_instances(ec2, 'AutoStop', 'true', 'running')
if instance_ids:
ec2.stop_instances(InstanceIds=instance_ids)
logger.info(f"Stopped instances: {instance_ids}")
else:
logger.info("No running instances with AutoStop=true found.")
elif action == 'start':
instance_ids = get_tagged_instances(ec2, 'AutoStop', 'true', 'stopped')
if instance_ids:
ec2.start_instances(InstanceIds=instance_ids)
logger.info(f"Started instances: {instance_ids}")
else:
logger.info("No stopped instances with AutoStop=true found.")
return {
'statusCode': 200,
'action': action,
'instances_affected': instance_ids if instance_ids else []
}
EventBridge Schedules (deploy with boto3 or CLI):
import boto3
events = boto3.client('events')
# Stop rule at 8 PM UTC
events.put_rule(
Name='StopDevInstances',
ScheduleExpression='cron(0 20 * * ? *)',
State='ENABLED'
)
# Start rule at 8 AM UTC
events.put_rule(
Name='StartDevInstances',
ScheduleExpression='cron(0 8 * * ? *)',
State='ENABLED'
)
Answer:
import boto3
from datetime import datetime, timezone, timedelta
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class EBSSnapshotManager:
def __init__(self, region='us-east-1', retention_days=30):
self.ec2 = boto3.client('ec2', region_name=region)
self.retention_days = retention_days
def get_running_instance_volumes(self):
"""Get all EBS volume IDs attached to running instances."""
volumes = []
paginator = self.ec2.get_paginator('describe_instances')
for page in paginator.paginate(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
):
for reservation in page['Reservations']:
for instance in reservation['Instances']:
instance_id = instance['InstanceId']
instance_name = next(
(tag['Value'] for tag in instance.get('Tags', [])
if tag['Key'] == 'Name'),
instance_id
)
for mapping in instance.get('BlockDeviceMappings', []):
volumes.append({
'volume_id': mapping['Ebs']['VolumeId'],
'instance_id': instance_id,
'instance_name': instance_name,
'device': mapping['DeviceName']
})
return volumes
def create_snapshots(self, volumes):
"""Create snapshots for a list of volumes."""
created = []
timestamp = datetime.now(timezone.utc).strftime('%Y-%m-%d-%H%M')
for vol in volumes:
try:
response = self.ec2.create_snapshot(
VolumeId=vol['volume_id'],
Description=f"Auto-backup {vol['instance_name']} {vol['device']} {timestamp}",
TagSpecifications=[{
'ResourceType': 'snapshot',
'Tags': [
{'Key': 'Name', 'Value': f"auto-snap-{vol['instance_name']}-{timestamp}"},
{'Key': 'AutoBackup', 'Value': 'true'},
{'Key': 'InstanceId', 'Value': vol['instance_id']},
{'Key': 'CreatedAt', 'Value': timestamp}
]
}]
)
snapshot_id = response['SnapshotId']
created.append(snapshot_id)
logger.info(f"Created snapshot {snapshot_id} for volume {vol['volume_id']}")
except Exception as e:
logger.error(f"Failed to snapshot volume {vol['volume_id']}: {e}")
return created
def delete_old_snapshots(self):
"""Delete snapshots tagged AutoBackup=true older than retention_days."""
cutoff = datetime.now(timezone.utc) - timedelta(days=self.retention_days)
deleted = []
paginator = self.ec2.get_paginator('describe_snapshots')
for page in paginator.paginate(
Filters=[{'Name': 'tag:AutoBackup', 'Values': ['true']}],
OwnerIds=['self']
):
for snapshot in page['Snapshots']:
if snapshot['StartTime'] < cutoff:
try:
self.ec2.delete_snapshot(SnapshotId=snapshot['SnapshotId'])
deleted.append(snapshot['SnapshotId'])
logger.info(f"Deleted old snapshot: {snapshot['SnapshotId']}")
except Exception as e:
logger.error(f"Could not delete {snapshot['SnapshotId']}: {e}")
return deleted
def run(self):
logger.info("Starting EBS snapshot backup cycle...")
volumes = self.get_running_instance_volumes()
logger.info(f"Found {len(volumes)} volumes across running instances")
created = self.create_snapshots(volumes)
deleted = self.delete_old_snapshots()
return {
'volumes_found': len(volumes),
'snapshots_created': len(created),
'snapshots_deleted': len(deleted)
}
# Lambda handler
def lambda_handler(event, context):
manager = EBSSnapshotManager(retention_days=30)
return manager.run()
if __name__ == '__main__':
manager = EBSSnapshotManager()
result = manager.run()
print(result)
Answer:
import boto3
from datetime import datetime, timezone, timedelta
def find_untagged_long_running_instances(region='us-east-1'):
ec2 = boto3.client('ec2', region_name=region)
ses = boto3.client('ses', region_name=region)
threshold = datetime.now(timezone.utc) - timedelta(days=7)
violations = []
paginator = ec2.get_paginator('describe_instances')
for page in paginator.paginate(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
):
for reservation in page['Reservations']:
for instance in reservation['Instances']:
launch_time = instance['LaunchTime']
tags = {t['Key']: t['Value'] for t in instance.get('Tags', [])}
if launch_time < threshold and 'Project' not in tags:
violations.append({
'InstanceId': instance['InstanceId'],
'LaunchTime': launch_time.strftime('%Y-%m-%d %H:%M UTC'),
'InstanceType': instance['InstanceType'],
'Name': tags.get('Name', 'Unnamed'),
'Owner': tags.get('Owner', 'Unknown')
})
if violations:
send_violation_email(ses, violations)
return violations
def send_violation_email(ses_client, violations):
rows = "\n".join([
f" - {v['InstanceId']} | {v['Name']} | {v['InstanceType']} | Running since {v['LaunchTime']} | Owner: {v['Owner']}"
for v in violations
])
body = f"""
AWS Compliance Alert: Untagged Long-Running EC2 Instances
The following EC2 instances have been running for more than 7 days
without a required 'Project' tag:
{rows}
Action Required: Please add the 'Project' tag or terminate unused instances.
This is an automated message from the Cloud Governance bot.
"""
ses_client.send_email(
Source='devops-alerts@company.com',
Destination={'ToAddresses': ['infra-team@company.com']},
Message={
'Subject': {'Data': f'⚠️ {len(violations)} Untagged EC2 Instances Detected'},
'Body': {'Text': {'Data': body}}
}
)
if __name__ == '__main__':
result = find_untagged_long_running_instances()
print(f"Found {len(result)} violations")
Answer:
import boto3
import os
import hashlib
import json
from pathlib import Path
from datetime import datetime
from botocore.exceptions import ClientError
class S3Syncer:
def __init__(self, bucket, prefix, kms_key_id, region='us-east-1'):
self.s3 = boto3.client('s3', region_name=region)
self.bucket = bucket
self.prefix = prefix.rstrip('/')
self.kms_key_id = kms_key_id
self.manifest = []
def get_file_md5(self, filepath):
"""Calculate MD5 hash for integrity verification."""
md5 = hashlib.md5()
with open(filepath, 'rb') as f:
for chunk in iter(lambda: f.read(8192), b''):
md5.update(chunk)
return md5.hexdigest()
def file_exists_in_s3(self, s3_key, local_md5):
"""Check if same file already exists in S3 (skip if unchanged)."""
try:
response = self.s3.head_object(Bucket=self.bucket, Key=s3_key)
s3_etag = response.get('ETag', '').strip('"')
return s3_etag == local_md5
except ClientError as e:
if e.response['Error']['Code'] == '404':
return False
raise
def upload_file(self, local_path, s3_key):
"""Upload a single file with KMS encryption."""
local_md5 = self.get_file_md5(local_path)
if self.file_exists_in_s3(s3_key, local_md5):
print(f" [SKIP] {s3_key} (unchanged)")
return 'skipped'
self.s3.upload_file(
Filename=str(local_path),
Bucket=self.bucket,
Key=s3_key,
ExtraArgs={
'ServerSideEncryption': 'aws:kms',
'SSEKMSKeyId': self.kms_key_id,
'Metadata': {'md5checksum': local_md5}
}
)
print(f" [UPLOAD] {s3_key}")
return 'uploaded'
def sync_directory(self, local_dir):
"""Recursively sync a local directory to S3."""
local_dir = Path(local_dir)
stats = {'uploaded': 0, 'skipped': 0, 'failed': 0}
for file_path in local_dir.rglob('*'):
if file_path.is_file():
relative = file_path.relative_to(local_dir)
s3_key = f"{self.prefix}/{relative}".replace('\\', '/')
try:
result = self.upload_file(file_path, s3_key)
stats[result] += 1
self.manifest.append({
'local_path': str(file_path),
's3_key': s3_key,
'size_bytes': file_path.stat().st_size,
'status': result,
'timestamp': datetime.utcnow().isoformat()
})
except Exception as e:
print(f" [ERROR] Failed to upload {file_path}: {e}")
stats['failed'] += 1
return stats
def save_manifest(self, output_path='manifest.json'):
"""Save upload manifest to a JSON file and also to S3."""
with open(output_path, 'w') as f:
json.dump(self.manifest, f, indent=2)
# Upload manifest itself to S3
manifest_key = f"{self.prefix}/_manifest_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.json"
self.s3.upload_file(
Filename=output_path,
Bucket=self.bucket,
Key=manifest_key,
ExtraArgs={'ServerSideEncryption': 'aws:kms', 'SSEKMSKeyId': self.kms_key_id}
)
print(f"\nManifest saved to s3://{self.bucket}/{manifest_key}")
if __name__ == '__main__':
syncer = S3Syncer(
bucket='my-company-data',
prefix='backups/app-data',
kms_key_id='arn:aws:kms:us-east-1:123456789012:key/abc-123'
)
stats = syncer.sync_directory('/opt/app/data')
syncer.save_manifest()
print(f"\nSync complete: {stats}")
Answer:
import boto3
import json
from botocore.exceptions import ClientError
def audit_s3_buckets():
s3 = boto3.client('s3')
report = []
buckets = s3.list_buckets()['Buckets']
print(f"Auditing {len(buckets)} S3 buckets...\n")
for bucket in buckets:
name = bucket['Name']
issues = []
# 1. Check public access block settings
try:
pab = s3.get_public_access_block(Bucket=name)['PublicAccessBlockConfiguration']
if not all([
pab.get('BlockPublicAcls'),
pab.get('IgnorePublicAcls'),
pab.get('BlockPublicPolicy'),
pab.get('RestrictPublicBuckets')
]):
issues.append('PUBLIC_ACCESS_NOT_FULLY_BLOCKED')
except ClientError as e:
if 'NoSuchPublicAccessBlockConfiguration' in str(e):
issues.append('NO_PUBLIC_ACCESS_BLOCK')
# 2. Check bucket policy for public access
try:
policy = json.loads(s3.get_bucket_policy(Bucket=name)['Policy'])
for stmt in policy.get('Statement', []):
if stmt.get('Effect') == 'Allow' and stmt.get('Principal') in ('*', {'AWS': '*'}):
issues.append('PUBLIC_BUCKET_POLICY')
break
except ClientError as e:
if 'NoSuchBucketPolicy' not in str(e):
issues.append(f'POLICY_CHECK_ERROR: {e}')
# 3. Check versioning
try:
versioning = s3.get_bucket_versioning(Bucket=name)
if versioning.get('Status') != 'Enabled':
issues.append('VERSIONING_DISABLED')
except ClientError:
issues.append('VERSIONING_CHECK_FAILED')
# 4. Check default encryption
try:
enc = s3.get_bucket_encryption(Bucket=name)
rules = enc['ServerSideEncryptionConfiguration']['Rules']
algo = rules[0]['ApplyServerSideEncryptionByDefault']['SSEAlgorithm']
if algo not in ('aws:kms', 'AES256'):
issues.append('WEAK_ENCRYPTION')
except ClientError as e:
if 'ServerSideEncryptionConfigurationNotFoundError' in str(e):
issues.append('ENCRYPTION_DISABLED')
# 5. Check logging
try:
logging_config = s3.get_bucket_logging(Bucket=name)
if 'LoggingEnabled' not in logging_config:
issues.append('ACCESS_LOGGING_DISABLED')
except ClientError:
issues.append('LOGGING_CHECK_FAILED')
report.append({
'bucket': name,
'issues': issues,
'compliant': len(issues) == 0
})
status = '✅' if not issues else '❌'
print(f"{status} {name}: {', '.join(issues) if issues else 'All checks passed'}")
# Summary
non_compliant = [r for r in report if not r['compliant']]
print(f"\n{'='*60}")
print(f"Total Buckets: {len(report)}")
print(f"Non-Compliant: {len(non_compliant)}")
return report
if __name__ == '__main__':
audit_s3_buckets()
Answer:
import boto3
from datetime import datetime, timezone, timedelta
def rotate_old_access_keys(dry_run=True, max_age_days=90):
iam = boto3.client('iam')
ses = boto3.client('ses', region_name='us-east-1')
report = []
users = []
paginator = iam.get_paginator('list_users')
for page in paginator.paginate():
users.extend(page['Users'])
threshold = datetime.now(timezone.utc) - timedelta(days=max_age_days)
for user in users:
username = user['UserName']
keys = iam.list_access_keys(UserName=username)['AccessKeyMetadata']
for key in keys:
if key['Status'] != 'Active':
continue
age = datetime.now(timezone.utc) - key['CreateDate']
if key['CreateDate'] < threshold:
action_taken = 'DRY_RUN'
if not dry_run:
# Step 1: Create new key
new_key = iam.create_access_key(UserName=username)['AccessKey']
# Step 2: Deactivate old key (don't delete immediately)
iam.update_access_key(
UserName=username,
AccessKeyId=key['AccessKeyId'],
Status='Inactive'
)
action_taken = 'ROTATED'
# Step 3: Notify user with new key via SES
notify_user(ses, username, key['AccessKeyId'], new_key)
report.append({
'username': username,
'old_key_id': key['AccessKeyId'],
'key_age_days': age.days,
'action': action_taken
})
print(f"[{action_taken}] User: {username}, Key: {key['AccessKeyId']}, Age: {age.days} days")
return report
def notify_user(ses_client, username, old_key_id, new_key):
"""Send new credentials securely to the user."""
# In production: use a secrets manager URL, not raw credentials in email
ses_client.send_email(
Source='devops@company.com',
Destination={'ToAddresses': [f'{username}@company.com']},
Message={
'Subject': {'Data': '🔑 Your AWS Access Key Has Been Rotated'},
'Body': {
'Text': {
'Data': f"""
Hi {username},
Your AWS access key {old_key_id} was older than 90 days and has been rotated.
Your new Access Key ID: {new_key['AccessKeyId']}
Your new Secret: [Stored securely in Secrets Manager at /iam/keys/{username}]
Action Required:
1. Retrieve your new secret from AWS Secrets Manager
2. Update your local ~/.aws/credentials
3. The old key will be deleted in 7 days
Contact devops@company.com if you have issues.
"""
}
}
}
)
if __name__ == '__main__':
# Run as dry_run=True first to preview
rotate_old_access_keys(dry_run=False, max_age_days=90)
Answer:
import boto3
import json
from collections import defaultdict
from datetime import datetime, timedelta
def generate_least_privilege_policy(role_name, lookback_days=30):
"""
Analyze CloudTrail logs to find all API calls made by a role
and generate a least-privilege IAM policy.
"""
cloudtrail = boto3.client('cloudtrail')
end_time = datetime.utcnow()
start_time = end_time - timedelta(days=lookback_days)
actions_by_service = defaultdict(set)
resources_used = defaultdict(set)
print(f"Analyzing CloudTrail logs for role: {role_name}")
print(f"Lookback period: {lookback_days} days\n")
paginator = cloudtrail.get_paginator('lookup_events')
for page in paginator.paginate(
LookupAttributes=[{
'AttributeKey': 'Username',
'AttributeValue': role_name
}],
StartTime=start_time,
EndTime=end_time
):
for event in page['Events']:
event_name = event.get('EventName', '')
event_source = event.get('EventSource', '').replace('.amazonaws.com', '')
if event_source and event_name:
action = f"{event_source}:{event_name}"
actions_by_service[event_source].add(action)
# Try to extract resource ARNs from the event
try:
detail = json.loads(event.get('CloudTrailEvent', '{}'))
request_params = detail.get('requestParameters', {})
if 'bucketName' in request_params:
resources_used[event_source].add(
f"arn:aws:s3:::{request_params['bucketName']}"
)
except Exception:
pass
# Build the IAM policy
policy_statements = []
for service, actions in actions_by_service.items():
resources = list(resources_used.get(service, ['*']))
policy_statements.append({
'Sid': f"Allow{service.replace('-', '').capitalize()}Actions",
'Effect': 'Allow',
'Action': sorted(list(actions)),
'Resource': resources
})
policy = {
'Version': '2012-10-17',
'Statement': policy_statements
}
print("Generated Least-Privilege Policy:")
print(json.dumps(policy, indent=2))
total_actions = sum(len(a) for a in actions_by_service.values())
print(f"\nTotal unique actions found: {total_actions}")
print(f"Services accessed: {list(actions_by_service.keys())}")
return policy
if __name__ == '__main__':
policy = generate_least_privilege_policy('my-app-role', lookback_days=30)
# Optionally save to file for review
with open('least_privilege_policy.json', 'w') as f:
json.dump(policy, f, indent=2)
print("\nPolicy saved to least_privilege_policy.json")
Answer:
import boto3
import zipfile
import io
import json
import hashlib
import time
class LambdaDeployer:
def __init__(self, region='us-east-1'):
self.lambda_client = boto3.client('lambda', region_name=region)
self.iam = boto3.client('iam', region_name=region)
def create_deployment_package(self, source_file):
"""Zip a Python file for Lambda deployment."""
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
zf.write(source_file, 'lambda_function.py')
zip_buffer.seek(0)
return zip_buffer.read()
def deploy_or_update(self, function_name, source_file, role_arn,
env_vars, runtime='python3.12', memory=256, timeout=30):
"""Create or update a Lambda function."""
zip_bytes = self.create_deployment_package(source_file)
code_hash = hashlib.sha256(zip_bytes).hexdigest()[:8]
try:
# Try to get existing function
existing = self.lambda_client.get_function(FunctionName=function_name)
print(f"Updating existing function: {function_name}")
# Update code
self.lambda_client.update_function_code(
FunctionName=function_name,
ZipFile=zip_bytes,
Publish=False
)
# Wait for update to complete
self._wait_for_update(function_name)
# Update configuration
self.lambda_client.update_function_configuration(
FunctionName=function_name,
Environment={'Variables': env_vars},
MemorySize=memory,
Timeout=timeout
)
self._wait_for_update(function_name)
except self.lambda_client.exceptions.ResourceNotFoundException:
print(f"Creating new function: {function_name}")
self.lambda_client.create_function(
FunctionName=function_name,
Runtime=runtime,
Role=role_arn,
Handler='lambda_function.lambda_handler',
Code={'ZipFile': zip_bytes},
Environment={'Variables': env_vars},
MemorySize=memory,
Timeout=timeout,
Tags={'DeployedBy': 'DeployScript', 'CodeHash': code_hash}
)
self._wait_for_active(function_name)
# Publish a new version
version_response = self.lambda_client.publish_version(
FunctionName=function_name,
Description=f"Deploy {code_hash}"
)
version = version_response['Version']
print(f"Published version: {version}")
return version
def create_or_update_alias(self, function_name, alias, version, weights=None):
"""Create/update an alias with optional traffic shifting."""
config = {
'FunctionName': function_name,
'Name': alias,
'FunctionVersion': version,
'Description': f"Points to version {version}"
}
# Traffic shifting: weights = {'old_version': 0.9} means 90% to old, 10% to new
if weights:
config['RoutingConfig'] = {
'AdditionalVersionWeights': weights
}
try:
self.lambda_client.get_alias(FunctionName=function_name, Name=alias)
self.lambda_client.update_alias(**config)
print(f"Updated alias '{alias}' → version {version}")
except self.lambda_client.exceptions.ResourceNotFoundException:
self.lambda_client.create_alias(**config)
print(f"Created alias '{alias}' → version {version}")
def _wait_for_update(self, function_name, max_wait=60):
"""Wait for function update to complete."""
for _ in range(max_wait):
resp = self.lambda_client.get_function_configuration(FunctionName=function_name)
if resp['LastUpdateStatus'] == 'Successful':
return
time.sleep(1)
raise TimeoutError(f"Lambda update timed out for {function_name}")
def _wait_for_active(self, function_name, max_wait=60):
"""Wait for new function to become active."""
for _ in range(max_wait):
resp = self.lambda_client.get_function_configuration(FunctionName=function_name)
if resp['State'] == 'Active':
return
time.sleep(1)
raise TimeoutError(f"Lambda activation timed out for {function_name}")
if __name__ == '__main__':
deployer = LambdaDeployer()
env_vars = {
'ENVIRONMENT': 'production',
'DB_HOST': 'prod-db.cluster.us-east-1.rds.amazonaws.com',
'LOG_LEVEL': 'INFO'
}
version = deployer.deploy_or_update(
function_name='my-app-processor',
source_file='lambda_function.py',
role_arn='arn:aws:iam::123456789012:role/LambdaExecRole',
env_vars=env_vars
)
# Canary: send 10% traffic to new version
deployer.create_or_update_alias(
function_name='my-app-processor',
alias='prod',
version=version,
weights={'2': 0.9} # 90% to version 2, 10% to new version
)
Answer:
import boto3
def setup_ec2_alarms(sns_topic_arn, region='us-east-1'):
ec2 = boto3.client('ec2', region_name=region)
cw = boto3.client('cloudwatch', region_name=region)
instances = []
paginator = ec2.get_paginator('describe_instances')
for page in paginator.paginate(
Filters=[{'Name': 'instance-state-name', 'Values': ['running']}]
):
for r in page['Reservations']:
instances.extend(r['Instances'])
print(f"Creating alarms for {len(instances)} running instances...")
alarms_created = 0
for instance in instances:
iid = instance['InstanceId']
name = next(
(t['Value'] for t in instance.get('Tags', []) if t['Key'] == 'Name'),
iid
)
alarm_configs = [
{
'name': f'{name}-HighCPU',
'metric': 'CPUUtilization',
'namespace': 'AWS/EC2',
'threshold': 80,
'comparison': 'GreaterThanThreshold',
'period': 300,
'eval_periods': 3,
'description': f'CPU > 80% for 15 min on {name}'
},
{
'name': f'{name}-StatusCheckFailed',
'metric': 'StatusCheckFailed',
'namespace': 'AWS/EC2',
'threshold': 0,
'comparison': 'GreaterThanThreshold',
'period': 60,
'eval_periods': 2,
'description': f'Status check failed on {name}'
},
# Memory & Disk require CloudWatch Agent (custom namespace)
{
'name': f'{name}-HighMemory',
'metric': 'mem_used_percent',
'namespace': 'CWAgent',
'threshold': 85,
'comparison': 'GreaterThanThreshold',
'period': 300,
'eval_periods': 3,
'description': f'Memory > 85% on {name}',
'extra_dims': [{'Name': 'InstanceId', 'Value': iid}]
},
{
'name': f'{name}-LowDiskSpace',
'metric': 'disk_used_percent',
'namespace': 'CWAgent',
'threshold': 85,
'comparison': 'GreaterThanThreshold',
'period': 300,
'eval_periods': 2,
'description': f'Disk > 85% on {name}',
'extra_dims': [
{'Name': 'InstanceId', 'Value': iid},
{'Name': 'path', 'Value': '/'},
{'Name': 'fstype', 'Value': 'xfs'}
]
}
]
for cfg in alarm_configs:
dimensions = [{'Name': 'InstanceId', 'Value': iid}]
if 'extra_dims' in cfg:
dimensions = cfg['extra_dims']
cw.put_metric_alarm(
AlarmName=cfg['name'],
AlarmDescription=cfg['description'],
MetricName=cfg['metric'],
Namespace=cfg['namespace'],
Statistic='Average',
Dimensions=dimensions,
Period=cfg['period'],
EvaluationPeriods=cfg['eval_periods'],
Threshold=cfg['threshold'],
ComparisonOperator=cfg['comparison'],
TreatMissingData='notBreaching',
AlarmActions=[sns_topic_arn],
OKActions=[sns_topic_arn]
)
alarms_created += 1
print(f"✅ Created {alarms_created} alarms for {len(instances)} instances")
if __name__ == '__main__':
setup_ec2_alarms(
sns_topic_arn='arn:aws:sns:us-east-1:123456789012:ops-alerts'
)
Answer:
import boto3
import time
from datetime import datetime
class RDSSnapshotManager:
def __init__(self, region='us-east-1'):
self.rds = boto3.client('rds', region_name=region)
def create_pre_deploy_snapshot(self, db_identifier, deploy_version):
"""Create a snapshot before a deployment."""
timestamp = datetime.utcnow().strftime('%Y%m%d-%H%M%S')
snapshot_id = f"pre-deploy-{db_identifier}-{deploy_version}-{timestamp}"
print(f"Creating pre-deployment snapshot: {snapshot_id}")
response = self.rds.create_db_snapshot(
DBSnapshotIdentifier=snapshot_id,
DBInstanceIdentifier=db_identifier,
Tags=[
{'Key': 'Type', 'Value': 'pre-deployment'},
{'Key': 'DeployVersion', 'Value': deploy_version},
{'Key': 'CreatedAt', 'Value': timestamp}
]
)
# Wait for snapshot to be available
waiter = self.rds.get_waiter('db_snapshot_available')
print("Waiting for snapshot to complete...")
waiter.wait(DBSnapshotIdentifier=snapshot_id)
print(f"✅ Snapshot ready: {snapshot_id}")
return snapshot_id
def restore_from_snapshot(self, snapshot_id, new_db_identifier, db_subnet_group, vpc_sg_ids):
"""Restore an RDS instance from a snapshot."""
print(f"Restoring {new_db_identifier} from snapshot {snapshot_id}...")
# Get original instance class from snapshot
snapshot = self.rds.describe_db_snapshots(
DBSnapshotIdentifier=snapshot_id
)['DBSnapshots'][0]
self.rds.restore_db_instance_from_db_snapshot(
DBInstanceIdentifier=new_db_identifier,
DBSnapshotIdentifier=snapshot_id,
DBInstanceClass=snapshot['DBInstanceClass'],
DBSubnetGroupName=db_subnet_group,
VpcSecurityGroupIds=vpc_sg_ids,
MultiAZ=True,
AutoMinorVersionUpgrade=True,
DeletionProtection=True,
Tags=[
{'Key': 'RestoredFrom', 'Value': snapshot_id},
{'Key': 'RestoredAt', 'Value': datetime.utcnow().isoformat()}
]
)
print("Waiting for restored instance to be available (this may take 10-20 minutes)...")
waiter = self.rds.get_waiter('db_instance_available')
waiter.wait(
DBInstanceIdentifier=new_db_identifier,
WaiterConfig={'Delay': 30, 'MaxAttempts': 60}
)
endpoint = self.rds.describe_db_instances(
DBInstanceIdentifier=new_db_identifier
)['DBInstances'][0]['Endpoint']['Address']
print(f"✅ Restored instance available at: {endpoint}")
return endpoint
def point_in_time_restore(self, source_db, target_db, restore_time,
db_subnet_group, vpc_sg_ids):
"""Restore to a specific point in time (RPO capability)."""
print(f"Restoring {source_db} to point-in-time: {restore_time}")
self.rds.restore_db_instance_to_point_in_time(
SourceDBInstanceIdentifier=source_db,
TargetDBInstanceIdentifier=target_db,
RestoreTime=restore_time,
DBSubnetGroupName=db_subnet_group,
VpcSecurityGroupIds=vpc_sg_ids
)
waiter = self.rds.get_waiter('db_instance_available')
waiter.wait(DBInstanceIdentifier=target_db)
print(f"✅ Point-in-time restore complete: {target_db}")
if __name__ == '__main__':
manager = RDSSnapshotManager()
# Before deployment
snapshot_id = manager.create_pre_deploy_snapshot(
db_identifier='prod-postgres',
deploy_version='v2.4.1'
)
print(f"Pre-deploy snapshot created: {snapshot_id}")
# If deployment fails, restore
# manager.restore_from_snapshot(
# snapshot_id=snapshot_id,
# new_db_identifier='prod-postgres-rollback',
# db_subnet_group='prod-db-subnet-group',
# vpc_sg_ids=['sg-0abc1234def56789']
# )
Answer:
import boto3
from datetime import datetime, timedelta
import json
def analyze_and_recommend_savings(lookback_days=30):
ce = boto3.client('ce', region_name='us-east-1')
end = datetime.utcnow().strftime('%Y-%m-%d')
start = (datetime.utcnow() - timedelta(days=lookback_days)).strftime('%Y-%m-%d')
print(f"Analyzing EC2 costs from {start} to {end}...\n")
# 1. Get current EC2 spend by instance type
cost_response = ce.get_cost_and_usage(
TimePeriod={'Start': start, 'End': end},
Granularity='MONTHLY',
Filter={
'And': [
{'Dimensions': {'Key': 'SERVICE', 'Values': ['Amazon Elastic Compute Cloud - Compute']}},
{'Dimensions': {'Key': 'PURCHASE_TYPE', 'Values': ['On-Demand']}}
]
},
Metrics=['BlendedCost', 'UsageQuantity'],
GroupBy=[{'Type': 'DIMENSION', 'Key': 'INSTANCE_TYPE'}]
)
instance_costs = {}
for result in cost_response['ResultsByTime']:
for group in result['Groups']:
instance_type = group['Keys'][0]
cost = float(group['Metrics']['BlendedCost']['Amount'])
instance_costs[instance_type] = instance_costs.get(instance_type, 0) + cost
# 2. Get Savings Plan recommendations from AWS
sp_response = ce.get_savings_plans_purchase_recommendation(
SavingsPlansType='COMPUTE_SP',
TermInYears='ONE_YEAR',
PaymentOption='NO_UPFRONT',
LookbackPeriodInDays='THIRTY_DAYS'
)
recommendations = sp_response.get('SavingsPlansPurchaseRecommendation', {})
summary = recommendations.get('SavingsPlansPurchaseRecommendationSummary', {})
# 3. Get RI recommendations
ri_response = ce.get_reservation_purchase_recommendation(
Service='Amazon EC2',
TermInYears='ONE_YEAR',
PaymentOption='NO_UPFRONT',
LookbackPeriodInDays='THIRTY_DAYS'
)
print("=" * 60)
print("EC2 ON-DEMAND SPEND BY INSTANCE TYPE (Last 30 Days)")
print("=" * 60)
for itype, cost in sorted(instance_costs.items(), key=lambda x: -x[1]):
print(f" {itype:<20} ${cost:>10.2f}")
total_cost = sum(instance_costs.values())
print(f"\n {'TOTAL ON-DEMAND':<20} ${total_cost:>10.2f}")
print("\n" + "=" * 60)
print("SAVINGS PLAN RECOMMENDATION")
print("=" * 60)
if summary:
print(f" Estimated Monthly Savings: ${float(summary.get('EstimatedMonthlySavingsAmount', 0)):,.2f}")
print(f" Savings Percentage: {summary.get('EstimatedSavingsPercentage', 'N/A')}%")
print(f" Recommended Hourly Commit: ${float(summary.get('HourlyCommitmentToPurchase', 0)):,.4f}")
print(f" Current On-Demand Spend: ${float(summary.get('CurrentOnDemandSpend', 0)):,.2f}")
print("\n" + "=" * 60)
print("RESERVED INSTANCE RECOMMENDATIONS (Top 5)")
print("=" * 60)
for rec in ri_response.get('Recommendations', [])[:5]:
details = rec.get('RecommendationDetails', [{}])[0]
spec = details.get('InstanceDetails', {}).get('EC2InstanceDetails', {})
print(f"\n Instance: {spec.get('InstanceType')} in {spec.get('Region')}")
print(f" Recommended: {details.get('RecommendedNumberOfInstancesToPurchase')} instances")
print(f" Monthly Save: ${float(details.get('EstimatedMonthlySavings', 0)):,.2f}")
print(f" Break-even: {details.get('RecurringStandardMonthlyCost', 'N/A')} months")
return {
'total_on_demand_spend': total_cost,
'savings_plan_summary': summary,
'instance_breakdown': instance_costs
}
if __name__ == '__main__':
analyze_and_recommend_savings()
Answer:
import boto3
import json
SENSITIVE_PORTS = {
22: 'SSH',
3389: 'RDP',
3306: 'MySQL',
5432: 'PostgreSQL',
27017: 'MongoDB',
6379: 'Redis',
9200: 'Elasticsearch',
8080: 'HTTP-Alt',
443: 'HTTPS',
80: 'HTTP'
}
HIGH_RISK_PORTS = {22, 3389, 3306, 5432, 27017, 6379} # Never expose these to 0.0.0.0/0
def audit_security_groups(region='us-east-1', auto_remediate=False):
ec2 = boto3.client('ec2', region_name=region)
findings = []
paginator = ec2.get_paginator('describe_security_groups')
for page in paginator.paginate():
for sg in page['SecurityGroups']:
sg_id = sg['GroupId']
sg_name = sg['GroupName']
vpc_id = sg.get('VpcId', 'EC2-Classic')
for rule in sg.get('IpPermissions', []):
from_port = rule.get('FromPort', 0)
to_port = rule.get('ToPort', 65535)
protocol = rule.get('IpProtocol', '-1')
for cidr_range in rule.get('IpRanges', []):
cidr = cidr_range.get('CidrIp', '')
if cidr in ('0.0.0.0/0', '::/0'):
# Check if any sensitive port is in range
for port, service in SENSITIVE_PORTS.items():
if protocol == '-1' or (from_port <= port <= to_port):
severity = 'CRITICAL' if port in HIGH_RISK_PORTS else 'HIGH'
finding = {
'sg_id': sg_id,
'sg_name': sg_name,
'vpc_id': vpc_id,
'port': port,
'service': service,
'protocol': protocol,
'cidr': cidr,
'severity': severity
}
findings.append(finding)
icon = '🔴' if severity == 'CRITICAL' else '🟠'
print(f"{icon} [{severity}] {sg_id} ({sg_name}) "
f"allows {service} ({port}) from {cidr}")
# Auto-remediate critical findings
if auto_remediate and port in HIGH_RISK_PORTS:
revoke_rule(ec2, sg_id, rule, cidr)
print(f"\n{'='*60}")
print(f"Total findings: {len(findings)}")
critical = [f for f in findings if f['severity'] == 'CRITICAL']
print(f"Critical (high-risk ports exposed): {len(critical)}")
return findings
def revoke_rule(ec2_client, sg_id, rule, cidr):
"""Remove a specific CIDR from an inbound rule."""
try:
ec2_client.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=[{
'IpProtocol': rule['IpProtocol'],
'FromPort': rule.get('FromPort', 0),
'ToPort': rule.get('ToPort', 65535),
'IpRanges': [{'CidrIp': cidr}]
}]
)
print(f" ✅ Revoked {cidr} from {sg_id}")
except Exception as e:
print(f" ❌ Failed to revoke: {e}")
if __name__ == '__main__':
# Set auto_remediate=True to automatically fix CRITICAL issues
findings = audit_security_groups(auto_remediate=False)
with open('sg_audit_report.json', 'w') as f:
json.dump(findings, f, indent=2)
print("\nFull report saved to sg_audit_report.json")
Answer:
import boto3
import time
import sys
class ECSRollingDeployer:
def __init__(self, cluster, service, region='us-east-1'):
self.ecs = boto3.client('ecs', region_name=region)
self.cluster = cluster
self.service = service
def get_current_task_definition(self):
response = self.ecs.describe_services(
cluster=self.cluster,
services=[self.service]
)
return response['services'][0]['taskDefinition']
def create_new_task_definition(self, current_td_arn, new_image):
"""Register a new task definition with updated container image."""
td = self.ecs.describe_task_definition(taskDefinition=current_td_arn)['taskDefinition']
# Update the image in the first container definition
containers = td['containerDefinitions']
containers[0]['image'] = new_image
# Strip non-required fields
for field in ['taskDefinitionArn', 'revision', 'status',
'requiresAttributes', 'compatibilities', 'registeredAt', 'registeredBy']:
td.pop(field, None)
response = self.ecs.register_task_definition(**td)
new_td_arn = response['taskDefinition']['taskDefinitionArn']
print(f"Registered new task definition: {new_td_arn}")
return new_td_arn
def deploy(self, new_image, health_check_retries=10, health_check_delay=30):
"""Deploy new image with automatic rollback on failure."""
print(f"Starting deployment of {new_image} to {self.service}...")
current_td = self.get_current_task_definition()
print(f"Current task definition: {current_td}")
new_td = self.create_new_task_definition(current_td, new_image)
# Update the service
self.ecs.update_service(
cluster=self.cluster,
service=self.service,
taskDefinition=new_td,
deploymentConfiguration={
'minimumHealthyPercent': 100,
'maximumPercent': 200
},
forceNewDeployment=True
)
print("Service update triggered. Monitoring health...")
# Monitor deployment
for attempt in range(health_check_retries):
time.sleep(health_check_delay)
status = self.get_deployment_status()
print(f"[{attempt+1}/{health_check_retries}] Deployment status: {status}")
if status == 'HEALTHY':
print(f"✅ Deployment successful! Running on {new_td}")
return True
elif status == 'FAILED':
print("❌ Deployment failed! Initiating rollback...")
self.rollback(current_td)
return False
print("⏰ Health checks timed out. Initiating rollback...")
self.rollback(current_td)
return False
def get_deployment_status(self):
"""Check if the new deployment is healthy."""
response = self.ecs.describe_services(
cluster=self.cluster,
services=[self.service]
)
service = response['services'][0]
deployments = service['deployments']
primary = next((d for d in deployments if d['status'] == 'PRIMARY'), None)
if not primary:
return 'UNKNOWN'
desired = primary['desiredCount']
running = primary['runningCount']
failed = primary['failedTasks']
if failed > 0:
return 'FAILED'
if running == desired:
return 'HEALTHY'
return f'IN_PROGRESS ({running}/{desired} running)'
def rollback(self, previous_td_arn):
"""Roll back to the previous task definition."""
self.ecs.update_service(
cluster=self.cluster,
service=self.service,
taskDefinition=previous_td_arn
)
print(f"⏪ Rollback complete. Reverted to: {previous_td_arn}")
if __name__ == '__main__':
deployer = ECSRollingDeployer(
cluster='prod-cluster',
service='payment-service'
)
success = deployer.deploy(
new_image='123456789012.dkr.ecr.us-east-1.amazonaws.com/payment:v2.4.1'
)
sys.exit(0 if success else 1)
Answer:
import boto3
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
"""
Triggered by EventBridge rule:
{ "detail-type": ["AWS API Call via CloudTrail"],
"detail": { "userIdentity": { "type": ["Root"] } } }
"""
sns = boto3.client('sns')
iam = boto3.client('iam')
detail = event.get('detail', {})
event_name = detail.get('eventName', 'Unknown')
source_ip = detail.get('sourceIPAddress', 'Unknown')
user_agent = detail.get('userAgent', 'Unknown')
event_time = detail.get('eventTime', 'Unknown')
region = detail.get('awsRegion', 'Unknown')
# Skip if this is from an authorized automated process
authorized_ips = ['10.0.0.0/8'] # Internal automation IPs
if any(source_ip.startswith(ip.split('/')[0][:5]) for ip in authorized_ips):
logger.info(f"Root API call from authorized IP {source_ip}, skipping alert.")
return
alert_message = f"""
🚨 ROOT ACCOUNT ACTIVITY DETECTED 🚨
Time: {event_time}
Event: {event_name}
Region: {region}
Source IP: {source_ip}
User Agent: {user_agent}
IMMEDIATE ACTION REQUIRED:
1. Verify if this was authorized
2. If unauthorized, rotate root credentials immediately
3. Review CloudTrail for all recent root actions
4. Check for any IAM users/roles created by root
CloudTrail Event Detail:
{json.dumps(detail, indent=2, default=str)}
"""
# Send high-priority SNS alert
sns.publish(
TopicArn='arn:aws:sns:us-east-1:123456789012:security-critical-alerts',
Subject='🚨 CRITICAL: AWS Root Account Activity Detected',
Message=alert_message,
MessageAttributes={
'severity': {
'DataType': 'String',
'StringValue': 'CRITICAL'
}
}
)
# Log to security SIEM (example: send to CloudWatch Logs with special prefix)
logger.critical(f"ROOT_ACCOUNT_USAGE | {event_name} | {source_ip} | {event_time}")
# Optional: If this is an unknown IP, create a deny-all policy as emergency measure
if source_ip not in ['known-admin-ip']:
logger.warning("Applying emergency root account restrictions...")
# Note: You cannot restrict root via IAM policy, but you can:
# 1. Alert the security team
# 2. Trigger a Step Functions workflow for incident response
# 3. Isolate any resources created by root in this session
return {
'statusCode': 200,
'message': 'Root account alert sent successfully'
}
Answer:
import boto3
import time
import json
import urllib.request
class CodePipelineMonitor:
def __init__(self, pipeline_name, slack_webhook_url, region='us-east-1'):
self.cp = boto3.client('codepipeline', region_name=region)
self.pipeline_name = pipeline_name
self.slack_url = slack_webhook_url
def trigger_pipeline(self):
"""Start the pipeline execution."""
response = self.cp.start_pipeline_execution(name=self.pipeline_name)
execution_id = response['pipelineExecutionId']
print(f"Pipeline triggered. Execution ID: {execution_id}")
self.send_slack(
f"🚀 *Deployment Started*\nPipeline: `{self.pipeline_name}`\nExecution: `{execution_id}`",
color='#439FE0'
)
return execution_id
def monitor(self, execution_id, poll_interval=15, timeout=1800):
"""Poll pipeline status and notify on stage changes."""
print(f"Monitoring pipeline execution: {execution_id}")
stage_statuses = {}
start = time.time()
while time.time() - start < timeout:
time.sleep(poll_interval)
try:
response = self.cp.get_pipeline_execution(
pipelineName=self.pipeline_name,
pipelineExecutionId=execution_id
)
status = response['pipelineExecution']['status']
# Get stage details
state = self.cp.get_pipeline_state(name=self.pipeline_name)
for stage in state['stageStates']:
stage_name = stage['stageName']
stage_status = stage.get('latestExecution', {}).get('status', 'UNKNOWN')
# Notify only on status change
if stage_statuses.get(stage_name) != stage_status:
stage_statuses[stage_name] = stage_status
self.notify_stage_change(stage_name, stage_status)
if status in ('Succeeded', 'Failed', 'Stopped', 'Superseded'):
self.notify_final_status(status, execution_id)
return status == 'Succeeded'
except Exception as e:
print(f"Error polling pipeline: {e}")
self.send_slack(f"⏰ Pipeline monitoring timed out after {timeout//60} minutes", color='#FFA500')
return False
def notify_stage_change(self, stage_name, status):
"""Send Slack notification for stage status change."""
emoji_map = {
'InProgress': '🔄',
'Succeeded': '✅',
'Failed': '❌',
'Stopped': '⛔',
'Skipped': '⏭️'
}
color_map = {
'InProgress': '#439FE0',
'Succeeded': '#36a64f',
'Failed': '#D00000',
'Stopped': '#FFA500'
}
emoji = emoji_map.get(status, '❓')
color = color_map.get(status, '#808080')
self.send_slack(
f"{emoji} Stage *{stage_name}*: `{status}`",
color=color
)
def notify_final_status(self, status, execution_id):
"""Final pipeline result notification."""
if status == 'Succeeded':
msg = f"✅ *Deployment Succeeded!*\nPipeline: `{self.pipeline_name}`\nExecution: `{execution_id}`"
color = '#36a64f'
else:
msg = f"❌ *Deployment FAILED!*\nPipeline: `{self.pipeline_name}`\nExecution: `{execution_id}`\nStatus: `{status}`"
color = '#D00000'
self.send_slack(msg, color=color)
def send_slack(self, message, color='#439FE0'):
"""Send message to Slack via webhook."""
payload = {
'attachments': [{
'color': color,
'text': message,
'footer': 'AWS CodePipeline Monitor',
'ts': int(time.time())
}]
}
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(
self.slack_url,
data=data,
headers={'Content-Type': 'application/json'}
)
try:
urllib.request.urlopen(req)
except Exception as e:
print(f"Slack notification failed: {e}")
if __name__ == '__main__':
monitor = CodePipelineMonitor(
pipeline_name='prod-app-pipeline',
slack_webhook_url='https://hooks.slack.com/services/T00/B00/xxx'
)
execution_id = monitor.trigger_pipeline()
success = monitor.monitor(execution_id)
exit(0 if success else 1)
Answer:
import boto3
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
def get_all_org_accounts():
"""Get all active accounts in the organization."""
org = boto3.client('organizations')
accounts = []
paginator = org.get_paginator('list_accounts')
for page in paginator.paginate():
accounts.extend([
a for a in page['Accounts'] if a['Status'] == 'ACTIVE'
])
return accounts
def assume_role(account_id, role_name='SecurityAuditRole'):
"""Assume a role in a target account and return credentials."""
sts = boto3.client('sts')
role_arn = f"arn:aws:iam::{account_id}:role/{role_name}"
try:
response = sts.assume_role(
RoleArn=role_arn,
RoleSessionName='SecurityAuditSession',
DurationSeconds=3600
)
creds = response['Credentials']
return {
'aws_access_key_id': creds['AccessKeyId'],
'aws_secret_access_key': creds['SecretAccessKey'],
'aws_session_token': creds['SessionToken']
}
except Exception as e:
print(f" ❌ Cannot assume role in {account_id}: {e}")
return None
def audit_account(account):
"""Run security checks on a single account."""
account_id = account['Id']
account_name = account['Name']
findings = {'account_id': account_id, 'account_name': account_name, 'checks': {}}
creds = assume_role(account_id)
if not creds:
findings['error'] = 'Cannot assume role'
return findings
try:
# Check 1: Root MFA enabled
iam = boto3.client('iam', **creds)
summary = iam.get_account_summary()['SummaryMap']
findings['checks']['root_mfa_enabled'] = bool(summary.get('AccountMFAEnabled', 0))
# Check 2: CloudTrail enabled in all regions
ct = boto3.client('cloudtrail', **creds)
trails = ct.describe_trails(includeShadowTrails=False)['trailList']
multi_region = any(t.get('IsMultiRegionTrail') for t in trails)
findings['checks']['cloudtrail_multi_region'] = multi_region
# Check 3: GuardDuty enabled
gd = boto3.client('guardduty', **creds)
detectors = gd.list_detectors()['DetectorIds']
findings['checks']['guardduty_enabled'] = len(detectors) > 0
# Check 4: AWS Config enabled
config = boto3.client('config', **creds)
recorders = config.describe_configuration_recorders()['ConfigurationRecorders']
findings['checks']['aws_config_enabled'] = len(recorders) > 0
# Check 5: Password policy
try:
policy = iam.get_account_password_policy()['PasswordPolicy']
findings['checks']['password_min_length'] = policy.get('MinimumPasswordLength', 0)
findings['checks']['password_requires_mfa'] = policy.get('RequireNumbers', False)
except iam.exceptions.NoSuchEntityException:
findings['checks']['password_policy'] = 'NOT_SET'
except Exception as e:
findings['error'] = str(e)
return findings
def generate_consolidated_report():
"""Audit all org accounts in parallel and generate a report."""
accounts = get_all_org_accounts()
print(f"Auditing {len(accounts)} accounts...\n")
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {executor.submit(audit_account, acct): acct for acct in accounts}
for future in as_completed(futures):
result = future.result()
results.append(result)
status = '✅' if not result.get('error') else '❌'
print(f"{status} {result['account_id']} ({result['account_name']})")
# Summary
print("\n" + "=" * 70)
print("CONSOLIDATED SECURITY REPORT")
print("=" * 70)
for r in sorted(results, key=lambda x: x['account_name']):
print(f"\n Account: {r['account_name']} ({r['account_id']})")
for check, val in r.get('checks', {}).items():
icon = '✅' if val else '❌'
print(f" {icon} {check}: {val}")
with open('org_security_report.json', 'w') as f:
json.dump(results, f, indent=2)
print("\nFull report saved to org_security_report.json")
if __name__ == '__main__':
generate_consolidated_report()
Answer:
import boto3
from typing import Generator, Any
def paginate_all(client, method_name: str, result_key: str, **kwargs) -> Generator[Any, None, None]:
"""
Generic paginator for any boto3 API call that supports pagination.
Usage:
for instance in paginate_all(ec2, 'describe_instances', 'Reservations', Filters=[...]):
print(instance)
"""
method = getattr(client, method_name)
# Try to use the built-in paginator first (most efficient)
try:
paginator = client.get_paginator(method_name)
for page in paginator.paginate(**kwargs):
yield from page.get(result_key, [])
except Exception:
# Fall back to manual NextToken pagination
while True:
response = method(**kwargs)
yield from response.get(result_key, [])
next_token = response.get('NextToken') or response.get('Marker')
if not next_token:
break
kwargs['NextToken'] = next_token
# Examples
if __name__ == '__main__':
ec2 = boto3.client('ec2')
s3 = boto3.client('s3')
iam = boto3.client('iam')
# List all EC2 instances
instances = list(paginate_all(ec2, 'describe_instances', 'Reservations'))
print(f"Total reservation pages fetched: {len(instances)}")
# List all S3 buckets (no pagination needed but works the same)
buckets = list(paginate_all(s3, 'list_buckets', 'Buckets'))
print(f"Total buckets: {len(buckets)}")
# List all IAM users
users = list(paginate_all(iam, 'list_users', 'Users'))
print(f"Total IAM users: {len(users)}")
Answer:
import boto3
import time
import random
import functools
import logging
from botocore.exceptions import ClientError
logger = logging.getLogger(__name__)
def aws_retry(max_retries=5, base_delay=0.5, max_delay=30, exceptions=None):
"""
Decorator for retrying AWS API calls with exponential backoff + jitter.
Handles ThrottlingException, RequestLimitExceeded, and other transient errors.
"""
if exceptions is None:
exceptions = {
'ThrottlingException',
'RequestLimitExceeded',
'TooManyRequestsException',
'ServiceUnavailable',
'InternalServerError',
'RequestTimeout',
'ProvisionedThroughputExceededException',
'LimitExceededException'
}
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code not in exceptions:
raise # Non-retryable error
last_exception = e
if attempt == max_retries:
logger.error(f"Max retries ({max_retries}) exceeded for {func.__name__}")
raise
# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
logger.warning(
f"[{func.__name__}] Attempt {attempt+1}/{max_retries} failed "
f"({error_code}). Retrying in {delay:.2f}s..."
)
time.sleep(delay)
raise last_exception
return wrapper
return decorator
# Usage example
@aws_retry(max_retries=5)
def list_all_buckets():
s3 = boto3.client('s3')
return s3.list_buckets()['Buckets']
@aws_retry(max_retries=3, base_delay=1)
def describe_instances(filters):
ec2 = boto3.client('ec2')
return ec2.describe_instances(Filters=filters)
# Class-based usage with context manager
class AWSClientWithRetry:
"""Wrapper that automatically retries all methods of a boto3 client."""
def __init__(self, service, region='us-east-1', max_retries=5):
self._client = boto3.client(service, region_name=region)
self._max_retries = max_retries
def __getattr__(self, name):
method = getattr(self._client, name)
if callable(method):
return aws_retry(max_retries=self._max_retries)(method)
return method
# Use like a normal client with automatic retries
if __name__ == '__main__':
ec2 = AWSClientWithRetry('ec2', max_retries=5)
instances = ec2.describe_instances() # Automatically retried on throttle
Answer:
import boto3
import logging
import os
import sys
import json
from botocore.config import Config
from botocore.exceptions import ClientError, EndpointConnectionError
from typing import Optional
# ─── 1. Structured Logging ───────────────────────────────────────────────────
def setup_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
'{"time": "%(asctime)s", "level": "%(levelname)s", '
'"logger": "%(name)s", "message": "%(message)s"}'
))
logger.addHandler(handler)
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))
return logger
logger = setup_logger('aws-automation')
# ─── 2. Config with Timeouts & Retries ───────────────────────────────────────
boto_config = Config(
retries={'max_attempts': 5, 'mode': 'adaptive'},
connect_timeout=5,
read_timeout=30,
max_pool_connections=10
)
# ─── 3. Credential Management — NEVER hardcode ───────────────────────────────
def get_client(service: str, region: Optional[str] = None, role_arn: Optional[str] = None):
"""
Get a boto3 client. Optionally assume a role.
Uses IAM role (Lambda/EC2) or ~/.aws/credentials automatically.
"""
region = region or os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
if role_arn:
sts = boto3.client('sts', config=boto_config)
assumed = sts.assume_role(
RoleArn=role_arn,
RoleSessionName='AutomationSession'
)['Credentials']
return boto3.client(
service,
region_name=region,
aws_access_key_id=assumed['AccessKeyId'],
aws_secret_access_key=assumed['SecretAccessKey'],
aws_session_token=assumed['SessionToken'],
config=boto_config
)
return boto3.client(service, region_name=region, config=boto_config)
# ─── 4. Secrets Retrieval — from Secrets Manager ─────────────────────────────
def get_secret(secret_name: str, region: str = 'us-east-1') -> dict:
"""Retrieve a JSON secret from AWS Secrets Manager."""
client = get_client('secretsmanager', region)
try:
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
except ClientError as e:
logger.error(f"Failed to retrieve secret {secret_name}: {e}")
raise
# ─── 5. Idempotent Operations ─────────────────────────────────────────────────
def create_s3_bucket_idempotent(bucket_name: str, region: str = 'us-east-1') -> bool:
"""Create S3 bucket only if it doesn't exist (idempotent)."""
s3 = get_client('s3', region)
try:
s3.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={'LocationConstraint': region} if region != 'us-east-1' else {}
)
logger.info(f"Created bucket: {bucket_name}")
return True
except ClientError as e:
if e.response['Error']['Code'] in ('BucketAlreadyOwnedByYou', 'BucketAlreadyExists'):
logger.info(f"Bucket already exists: {bucket_name}")
return True
logger.error(f"Failed to create bucket {bucket_name}: {e}")
raise
# ─── 6. Environment-Aware Configuration ──────────────────────────────────────
class Config:
ENV = os.environ.get('ENVIRONMENT', 'dev')
REGION = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN', '')
DRY_RUN = os.environ.get('DRY_RUN', 'false').lower() == 'true'
@classmethod
def validate(cls):
required = ['SNS_TOPIC_ARN']
missing = [k for k in required if not getattr(cls, k)]
if missing:
raise EnvironmentError(f"Missing required env vars: {missing}")
# ─── 7. Graceful Main Function ────────────────────────────────────────────────
def main():
try:
Config.validate()
logger.info(f"Starting automation in {Config.ENV} | dry_run={Config.DRY_RUN}")
# Your logic here
bucket_name = f"my-app-{Config.ENV}-data"
create_s3_bucket_idempotent(bucket_name, Config.REGION)
logger.info("Automation completed successfully")
return 0
except EnvironmentError as e:
logger.critical(f"Configuration error: {e}")
return 1
except EndpointConnectionError as e:
logger.critical(f"AWS connectivity error (check network/proxy): {e}")
return 2
except ClientError as e:
logger.critical(f"AWS API error: {e.response['Error']}")
return 3
except Exception as e:
logger.critical(f"Unexpected error: {e}", exc_info=True)
return 99
if __name__ == '__main__':
sys.exit(main())
Answer:
import boto3
from datetime import datetime, timezone, timedelta
import json
class AWSResourceCleaner:
def __init__(self, region='us-east-1', dry_run=True, age_days=30):
self.ec2 = boto3.client('ec2', region_name=region)
self.dry_run = dry_run
self.age_days = age_days
self.report = {'dry_run': dry_run, 'resources': [], 'estimated_savings': 0}
def clean_unattached_ebs_volumes(self):
"""Find and delete EBS volumes in 'available' state."""
volumes = self.ec2.describe_volumes(
Filters=[{'Name': 'status', 'Values': ['available']}]
)['Volumes']
ebs_prices = {'gp3': 0.08, 'gp2': 0.10, 'io1': 0.125, 'st1': 0.045, 'sc1': 0.025}
for vol in volumes:
vol_id = vol['VolumeId']
size_gb = vol['Size']
vol_type = vol['VolumeType']
monthly_cost = size_gb * ebs_prices.get(vol_type, 0.10)
entry = {
'type': 'EBS_Volume',
'id': vol_id,
'size_gb': size_gb,
'volume_type': vol_type,
'monthly_cost_usd': round(monthly_cost, 2),
'action': 'DRY_RUN' if self.dry_run else 'DELETED'
}
if not self.dry_run:
self.ec2.delete_volume(VolumeId=vol_id)
print(f" 🗑️ Deleted EBS volume: {vol_id} ({size_gb}GB {vol_type}) ~ ${monthly_cost:.2f}/mo")
else:
print(f" [DRY-RUN] Would delete EBS: {vol_id} ({size_gb}GB {vol_type}) ~ ${monthly_cost:.2f}/mo")
self.report['resources'].append(entry)
self.report['estimated_savings'] += monthly_cost
def clean_unassociated_eips(self):
"""Release Elastic IPs not associated with any instance."""
eips = self.ec2.describe_addresses()['Addresses']
eip_monthly_cost = 3.65 # ~$0.005/hour for unused EIPs
for eip in eips:
if 'AssociationId' not in eip:
alloc_id = eip.get('AllocationId')
public_ip = eip.get('PublicIp')
entry = {
'type': 'Elastic_IP',
'id': alloc_id,
'public_ip': public_ip,
'monthly_cost_usd': eip_monthly_cost,
'action': 'DRY_RUN' if self.dry_run else 'RELEASED'
}
if not self.dry_run and alloc_id:
self.ec2.release_address(AllocationId=alloc_id)
print(f" 🗑️ Released EIP: {public_ip} ~ ${eip_monthly_cost:.2f}/mo")
else:
print(f" [DRY-RUN] Would release EIP: {public_ip} ~ ${eip_monthly_cost:.2f}/mo")
self.report['resources'].append(entry)
self.report['estimated_savings'] += eip_monthly_cost
def clean_old_amis(self):
"""Deregister AMIs older than age_days (keep the 3 most recent per name prefix)."""
cutoff = datetime.now(timezone.utc) - timedelta(days=self.age_days)
amis = self.ec2.describe_images(
Owners=['self'],
Filters=[{'Name': 'state', 'Values': ['available']}]
)['Images']
# Group by name prefix (e.g., 'app-server-*')
from collections import defaultdict
grouped = defaultdict(list)
for ami in amis:
prefix = '-'.join(ami.get('Name', 'unknown').split('-')[:-1]) or 'unknown'
grouped[prefix].append(ami)
for prefix, ami_list in grouped.items():
sorted_amis = sorted(ami_list, key=lambda x: x['CreationDate'], reverse=True)
to_delete = sorted_amis[3:] # Keep 3 most recent
for ami in to_delete:
creation = datetime.fromisoformat(ami['CreationDate'].replace('Z', '+00:00'))
if creation < cutoff:
ami_id = ami['ImageId']
entry = {
'type': 'AMI',
'id': ami_id,
'name': ami.get('Name'),
'created': ami['CreationDate'],
'monthly_cost_usd': 0.5,
'action': 'DRY_RUN' if self.dry_run else 'DEREGISTERED'
}
if not self.dry_run:
self.ec2.deregister_image(ImageId=ami_id)
print(f" 🗑️ Deregistered AMI: {ami_id} ({ami.get('Name')})")
else:
print(f" [DRY-RUN] Would deregister AMI: {ami_id} ({ami.get('Name')})")
self.report['resources'].append(entry)
self.report['estimated_savings'] += 0.5
def run(self):
print(f"\n{'='*60}")
print(f"AWS Resource Cleanup ({'DRY RUN' if self.dry_run else 'LIVE'})")
print(f"{'='*60}\n")
print("🔍 Scanning EBS Volumes...")
self.clean_unattached_ebs_volumes()
print("\n🔍 Scanning Elastic IPs...")
self.clean_unassociated_eips()
print("\n🔍 Scanning Old AMIs...")
self.clean_old_amis()
# Final Report
print(f"\n{'='*60}")
print("💰 CLEANUP SUMMARY")
print(f"{'='*60}")
print(f"Resources identified: {len(self.report['resources'])}")
print(f"Estimated monthly savings: ${self.report['estimated_savings']:.2f}")
print(f"Estimated annual savings: ${self.report['estimated_savings'] * 12:.2f}")
with open('cleanup_report.json', 'w') as f:
json.dump(self.report, f, indent=2)
print("\nDetailed report saved to cleanup_report.json")
return self.report
if __name__ == '__main__':
# Always run dry_run=True first!
cleaner = AWSResourceCleaner(dry_run=True, age_days=30)
cleaner.run()
💡 Quick Reference: boto3 Cheat Sheet
# ─── Session & Client Setup ───────────────────────────────────────────────────
import boto3
# Default credentials (from env, ~/.aws, or IAM role)
client = boto3.client('ec2', region_name='us-east-1')
# Assume role
sts = boto3.client('sts')
creds = sts.assume_role(RoleArn='arn:aws:iam::123:role/MyRole',
RoleSessionName='session')['Credentials']
assumed_client = boto3.client('s3',
aws_access_key_id=creds['AccessKeyId'],
aws_secret_access_key=creds['SecretAccessKey'],
aws_session_token=creds['SessionToken'])
# ─── Pagination ───────────────────────────────────────────────────────────────
paginator = client.get_paginator('describe_instances')
for page in paginator.paginate():
for reservation in page['Reservations']:
...
# ─── Waiters ──────────────────────────────────────────────────────────────────
waiter = client.get_waiter('instance_running')
waiter.wait(InstanceIds=['i-1234567890abcdef0'])
# ─── Resource API (higher-level) ─────────────────────────────────────────────
s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')
for obj in bucket.objects.all():
print(obj.key)
# ─── Error Handling ───────────────────────────────────────────────────────────
from botocore.exceptions import ClientError
try:
client.some_api_call()
except ClientError as e:
code = e.response['Error']['Code']
message = e.response['Error']['Message']
if code == 'NoSuchBucket':
...
# ─── Common Services Quick Reference ─────────────────────────────────────────
# EC2 → boto3.client('ec2')
# S3 → boto3.client('s3') or boto3.resource('s3')
# IAM → boto3.client('iam')
# Lambda → boto3.client('lambda')
# RDS → boto3.client('rds')
# DynamoDB → boto3.client('dynamodb') or boto3.resource('dynamodb')
# SQS → boto3.client('sqs') or boto3.resource('sqs')
# SNS → boto3.client('sns')
# ECS → boto3.client('ecs')
# CloudWatch → boto3.client('cloudwatch') or boto3.client('logs')
# Secrets → boto3.client('secretsmanager')
# SSM → boto3.client('ssm')
# STS → boto3.client('sts')
# Org → boto3.client('organizations')
# CE → boto3.client('ce') # Cost Explorer
📚 Python AWS Resources
| Resource | Purpose |
|---|---|
| boto3 Docs | Official boto3 reference |
| AWS SDK Examples (GitHub) | Official code examples |
| LocalStack | Test AWS locally |
| moto | Mock AWS services for unit testing |
| AWS CDK (Python) | IaC with Python |
| Powertools for Lambda | Production Lambda utilities |
🏆 Key Interview Tips
- Always mention pagination — Interviewers love seeing you know about
get_paginator(). - Discuss error handling — Show
ClientErrorhandling and exponential backoff. - Think about idempotency — Scripts should be safe to run multiple times.
- Mention dry-run mode — Critical for destructive operations.
- Security first — Never hardcode credentials; always use IAM roles or Secrets Manager.
- Discuss testing — Mention
motofor mocking AWS in unit tests. - Logging over print — Use structured logging with
loggingmodule. - Environment variables — Configuration should come from env vars, not hardcoded.
Last updated: 2025 | Covers Python 3.10+ and boto3 latest | Questions sourced from interviews at Amazon, Atlassian, Datadog, HashiCorp, and Cloudflare.
⭐ Pair this with the AWS Scenario-Based Interview Q&A README for complete preparation!
Add More Questions to This Guide
Know questions that should be here? Share them and help the community!
Open Google Form