The Situation
Security compliance auditing — run weekly to detect S3 buckets that violate your organization's data protection policy before they become a breach.
Problem Statement
A misconfigured S3 bucket was the #1 cause of cloud data breaches in 2023. Common mistakes: public access not blocked, no versioning (can’t recover deleted files), no default encryption, and no access logging. Running a manual audit across 50+ buckets is error-prone. This script checks all 5 controls automatically.
Five Controls Checked
| Check | API Call | Fail Condition |
|---|
| Public access block | get_public_access_block | Any of the 4 settings is False |
| Bucket policy | get_bucket_policy | Statement has Principal: * + Effect: Allow |
| Versioning | get_bucket_versioning | Status is not "Enabled" |
| Default encryption | get_bucket_encryption | No SSE configuration found |
| Access logging | get_bucket_logging | LoggingEnabled key absent |
Complete Script
import boto3
import json
from botocore.exceptions import ClientError
def audit_s3_buckets(region: str = "us-east-1") -> list[dict]:
"""
s3.list_buckets() returns ALL buckets in the account (globally scoped).
Unlike most AWS services, S3 buckets don't require a region filter —
list_buckets() always returns everything regardless of client region.
We then make per-bucket API calls for each compliance check.
"""
s3 = boto3.client("s3", region_name=region)
report = []
buckets = s3.list_buckets()["Buckets"]
print(f"Auditing {len(buckets)} S3 buckets...\n")
for bucket in buckets:
name = bucket["Name"]
issues: list[str] = []
# ── Check 1: Public Access Block ─────────────────────────
# get_public_access_block() returns four boolean settings:
# - BlockPublicAcls: Block new public ACLs
# - IgnorePublicAcls: Ignore existing public ACLs
# - BlockPublicPolicy: Block new public bucket policies
# - RestrictPublicBuckets: Restrict access for public-policy buckets
# ALL four must be True for the bucket to be fully protected.
try:
pab = s3.get_public_access_block(Bucket=name)[
"PublicAccessBlockConfiguration"
]
if not all([
pab.get("BlockPublicAcls"),
pab.get("IgnorePublicAcls"),
pab.get("BlockPublicPolicy"),
pab.get("RestrictPublicBuckets"),
]):
issues.append("PUBLIC_ACCESS_NOT_FULLY_BLOCKED")
except ClientError as e:
if "NoSuchPublicAccessBlockConfiguration" in str(e):
issues.append("NO_PUBLIC_ACCESS_BLOCK")
else:
issues.append(f"PAB_CHECK_ERROR")
# ── Check 2: Bucket Policy — Public Principal ─────────────
# get_bucket_policy() returns the policy as a JSON string.
# We parse it and look for statements where:
# Effect = "Allow" AND Principal = "*" or {"AWS": "*"}
# These grant public (anonymous) access to the bucket.
try:
raw_policy = s3.get_bucket_policy(Bucket=name)["Policy"]
policy = json.loads(raw_policy)
for stmt in policy.get("Statement", []):
principal = stmt.get("Principal")
is_public = principal in ("*", {"AWS": "*"})
if stmt.get("Effect") == "Allow" and is_public:
issues.append("PUBLIC_BUCKET_POLICY")
break # One public statement is enough to flag it
except ClientError as e:
if "NoSuchBucketPolicy" not in str(e):
# Real error (e.g., AccessDenied) — note it
issues.append("POLICY_CHECK_ERROR")
# NoSuchBucketPolicy means no policy exists — not necessarily bad
# ── Check 3: Versioning ───────────────────────────────────
# get_bucket_versioning() can return:
# {"Status": "Enabled"} — versioning on
# {"Status": "Suspended"} — was on, now paused
# {} — never enabled
# Only "Enabled" is compliant.
try:
versioning = s3.get_bucket_versioning(Bucket=name)
if versioning.get("Status") != "Enabled":
issues.append("VERSIONING_DISABLED")
except ClientError:
issues.append("VERSIONING_CHECK_FAILED")
# ── Check 4: Default Encryption ───────────────────────────
# get_bucket_encryption() raises ServerSideEncryptionConfigurationNotFoundError
# if no default encryption is configured.
# Valid algorithms: "AES256" (SSE-S3) or "aws:kms" (SSE-KMS).
try:
enc = s3.get_bucket_encryption(Bucket=name)
rules = enc["ServerSideEncryptionConfiguration"]["Rules"]
algo = rules[0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
if algo not in ("aws:kms", "AES256"):
issues.append("WEAK_ENCRYPTION")
except ClientError as e:
if "ServerSideEncryptionConfigurationNotFoundError" in str(e):
issues.append("ENCRYPTION_DISABLED")
# ── Check 5: Access Logging ───────────────────────────────
# get_bucket_logging() returns {"LoggingEnabled": {...}} if logging
# is configured, or just {} if disabled.
# Logging is essential for security investigations and audits.
try:
logging_cfg = s3.get_bucket_logging(Bucket=name)
if "LoggingEnabled" not in logging_cfg:
issues.append("ACCESS_LOGGING_DISABLED")
except ClientError:
issues.append("LOGGING_CHECK_FAILED")
# ── Record result ─────────────────────────────────────────
compliant = len(issues) == 0
report.append({
"bucket": name,
"issues": issues,
"compliant": compliant,
})
status = "✅" if compliant else "❌"
issue_str = ", ".join(issues) if issues else "All checks passed"
print(f"{status} {name}: {issue_str}")
# ── Summary ───────────────────────────────────────────────────
non_compliant = [r for r in report if not r["compliant"]]
print(f"\n{'='*60}")
print(f"Total Buckets: {len(report)}")
print(f"Compliant: {len(report) - len(non_compliant)}")
print(f"Non-Compliant: {len(non_compliant)}")
if non_compliant:
print("\nTop Issues:")
from collections import Counter
all_issues = [i for r in non_compliant for i in r["issues"]]
for issue, count in Counter(all_issues).most_common():
print(f" {issue}: {count} bucket(s)")
return report
def save_report(report: list[dict], output: str = "s3_audit_report.json") -> None:
with open(output, "w") as f:
json.dump(report, f, indent=2)
print(f"\nFull report saved to {output}")
def auto_fix_public_access(bucket_name: str, dry_run: bool = True) -> None:
"""
put_public_access_block() with all four settings = True
is the AWS-recommended way to make a bucket fully private.
This is a non-destructive operation — it doesn't delete any objects.
"""
s3 = boto3.client("s3")
if dry_run:
print(f"[DRY-RUN] Would enable Public Access Block on: {bucket_name}")
return
s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
print(f"✅ Public Access Block enabled on: {bucket_name}")
if __name__ == "__main__":
report = audit_s3_buckets()
save_report(report)
# Auto-fix any bucket with PUBLIC_ACCESS_NOT_FULLY_BLOCKED
for r in report:
if "PUBLIC_ACCESS_NOT_FULLY_BLOCKED" in r["issues"]:
auto_fix_public_access(r["bucket"], dry_run=True) # Set dry_run=False to apply
Sample Output
Auditing 12 S3 buckets...
✅ my-logs-bucket: All checks passed
❌ old-dev-data: PUBLIC_ACCESS_NOT_FULLY_BLOCKED, VERSIONING_DISABLED
❌ finance-reports: ENCRYPTION_DISABLED, ACCESS_LOGGING_DISABLED
✅ prod-artifacts: All checks passed
❌ test-bucket-2024: NO_PUBLIC_ACCESS_BLOCK, VERSIONING_DISABLED, ENCRYPTION_DISABLED
============================================================
Total Buckets: 12
Compliant: 8
Non-Compliant: 4
Top Issues:
VERSIONING_DISABLED: 3 bucket(s)
ENCRYPTION_DISABLED: 2 bucket(s)
PUBLIC_ACCESS_NOT_FULLY_BLOCKED: 2 bucket(s)
ACCESS_LOGGING_DISABLED: 2 bucket(s)
Key Commands Explained
| Command | What it does |
|---|
s3.list_buckets()["Buckets"] | Returns all buckets in the account as a list of dicts |
get_public_access_block(Bucket=name) | Returns the 4-setting Public Access Block config |
get_bucket_policy(Bucket=name)["Policy"] | Returns the bucket policy as a JSON string |
json.loads(raw_policy) | Parses the JSON string into a Python dict |
get_bucket_versioning(Bucket=name) | Returns versioning status — Enabled, Suspended, or {} |
get_bucket_encryption(Bucket=name) | Returns SSE config — raises if not configured |
get_bucket_logging(Bucket=name) | Returns {"LoggingEnabled": {...}} if logging is on |
put_public_access_block(...) | Enables all four public access restrictions |
🔍 Line-by-Line Code Walkthrough
Imports
| Line | Why It’s Used |
|---|
import boto3 | AWS SDK for Python — needed for S3 API calls |
import json | Standard library. Bucket policies are returned as JSON strings — we use json.loads() to parse them into Python dicts |
from botocore.exceptions import ClientError | AWS SDK error class. Each security check may raise a ClientError (e.g., NoSuchBucketPolicy, ServerSideEncryptionConfigurationNotFoundError) |
audit_s3_buckets()
s3 = boto3.client("s3", region_name=region)
buckets = s3.list_buckets()["Buckets"]
| Line | Explanation |
|---|
boto3.client("s3", region_name=region) | S3 client. Even though S3 is a global service, we specify a region for the API endpoint |
s3.list_buckets() | Returns all buckets in the account — S3 buckets are global, so this is not region-filtered |
["Buckets"] | The response dict has a "Buckets" key containing a list of {"Name": "...", "CreationDate": ...} dicts |
Check 1 — Public Access Block
pab = s3.get_public_access_block(Bucket=name)["PublicAccessBlockConfiguration"]
if not all([
pab.get("BlockPublicAcls"),
pab.get("IgnorePublicAcls"),
pab.get("BlockPublicPolicy"),
pab.get("RestrictPublicBuckets"),
]):
issues.append("PUBLIC_ACCESS_NOT_FULLY_BLOCKED")
| Line | Explanation |
|---|
get_public_access_block(Bucket=name) | Retrieves the bucket’s Public Access Block configuration |
["PublicAccessBlockConfiguration"] | The nested key containing the four boolean settings |
pab.get("BlockPublicAcls") | True = blocks new public ACLs from being set on this bucket |
pab.get("IgnorePublicAcls") | True = ignores any existing public ACLs (even if someone somehow set one) |
pab.get("BlockPublicPolicy") | True = blocks bucket policies that grant public access |
pab.get("RestrictPublicBuckets") | True = restricts access for public-policy buckets — prevents cross-account anonymous access |
not all([...]) | all() returns True only if every item in the list is truthy. not all means “if at least one is False” — the bucket is not fully protected |
except ClientError: "NoSuchPublicAccessBlockConfiguration" | Raised when no Public Access Block config exists at all — even worse than one with partial settings |
Check 2 — Bucket Policy Public Principal
raw_policy = s3.get_bucket_policy(Bucket=name)["Policy"]
policy = json.loads(raw_policy)
for stmt in policy.get("Statement", []):
principal = stmt.get("Principal")
is_public = principal in ("*", {"AWS": "*"})
if stmt.get("Effect") == "Allow" and is_public:
issues.append("PUBLIC_BUCKET_POLICY")
break
| Line | Explanation |
|---|
get_bucket_policy(Bucket=name)["Policy"] | Returns the bucket policy as a raw JSON string (not a dict). Raises NoSuchBucketPolicy if no policy exists |
json.loads(raw_policy) | Parses the JSON string into a Python dict. This allows us to iterate through Statement elements |
policy.get("Statement", []) | Gets the list of IAM policy statements. Returns [] if the key is absent |
stmt.get("Principal") | The entity the statement applies to. "*" or {"AWS": "*"} means ANY anonymous user — a major security risk |
principal in ("*", {"AWS": "*"}) | Checks for both forms of the wildcard principal |
stmt.get("Effect") == "Allow" | Only Allow statements with a public principal are dangerous. A Deny with * principal actually restricts access |
break | Once one public statement is found, we don’t need to keep looking — the bucket is flagged |
"NoSuchBucketPolicy" not in str(e) | Missing bucket policy is NOT a violation — many secure buckets have no policy. Only real errors (AccessDenied, etc.) are flagged |
Check 3 — Versioning
versioning = s3.get_bucket_versioning(Bucket=name)
if versioning.get("Status") != "Enabled":
issues.append("VERSIONING_DISABLED")
| Line | Explanation |
|---|
get_bucket_versioning(Bucket=name) | Returns the versioning state. The response is a dict with an optional Status key |
versioning.get("Status") | Returns "Enabled", "Suspended", or None (key absent = never enabled). .get() is used because Status is absent from the dict when versioning was never configured |
!= "Enabled" | Both "Suspended" and absent mean versioning is not protecting the bucket. "Suspended" means it was on but is now paused — new objects are not versioned |
Check 4 — Default Encryption
enc = s3.get_bucket_encryption(Bucket=name)
rules = enc["ServerSideEncryptionConfiguration"]["Rules"]
algo = rules[0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"]
if algo not in ("aws:kms", "AES256"):
issues.append("WEAK_ENCRYPTION")
| Line | Explanation |
|---|
get_bucket_encryption(Bucket=name) | Returns the default SSE config. Raises ServerSideEncryptionConfigurationNotFoundError if no default encryption is set |
["ServerSideEncryptionConfiguration"]["Rules"] | Nested path to reach the list of encryption rules |
rules[0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] | The algorithm in use: "AES256" (S3-managed key, free) or "aws:kms" (customer-managed key, stronger control) |
if algo not in ("aws:kms", "AES256") | Both algorithms are acceptable. Any other value (rare) would be flagged |
Check 5 — Access Logging
logging_cfg = s3.get_bucket_logging(Bucket=name)
if "LoggingEnabled" not in logging_cfg:
issues.append("ACCESS_LOGGING_DISABLED")
| Line | Explanation |
|---|
get_bucket_logging(Bucket=name) | Returns the logging configuration. Unlike other checks, this does NOT raise an exception when logging is off — it returns an empty dict {} |
"LoggingEnabled" not in logging_cfg | When logging is configured, the response contains {"LoggingEnabled": {"TargetBucket": "...", "TargetPrefix": "..."}}. If the key is absent, logging is disabled |
Summary & Auto-Fix
non_compliant = [r for r in report if not r["compliant"]]
from collections import Counter
all_issues = [i for r in non_compliant for i in r["issues"]]
for issue, count in Counter(all_issues).most_common():
print(f" {issue}: {count} bucket(s)")
| Line | Explanation |
|---|
[r for r in report if not r["compliant"]] | List comprehension filtering to only non-compliant buckets |
[i for r in non_compliant for i in r["issues"]] | Nested list comprehension that flattens: list of dicts with issue lists → single flat list of all issue strings |
Counter(all_issues) | Counts occurrences of each issue string. Counter(["A", "A", "B"]) → {"A": 2, "B": 1} |
.most_common() | Returns items sorted by count descending — shows the most widespread issues first |
def auto_fix_public_access(bucket_name: str, dry_run: bool = True) -> None:
s3.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
"BlockPublicAcls": True, "IgnorePublicAcls": True,
"BlockPublicPolicy": True, "RestrictPublicBuckets": True,
},
)
| Line | Explanation |
|---|
dry_run: bool = True | Default is dry-run (safe). Caller must explicitly pass dry_run=False to make changes |
put_public_access_block(...) | Non-destructive remediation — sets all 4 block settings to True. Does not delete objects or policies |
All four set to True | AWS recommends all four enabled for maximum protection. Together they block all forms of public access |