The Situation
Security posture auditing — run weekly to catch any Security Group that accidentally exposes SSH, RDP, or database ports to the public internet.
Problem Statement
A developer accidentally added 0.0.0.0/0 to a Security Group to “quickly test” SSH access. They forgot to remove it. Three days later, the instance was cryptomining. This script catches that misconfiguration before it becomes a breach.
Risk Levels
| Severity | Ports | Why dangerous |
|---|
| CRITICAL | 22 (SSH), 3389 (RDP), 3306 (MySQL), 5432 (PostgreSQL), 27017 (MongoDB), 6379 (Redis) | Direct remote access to servers or databases |
| HIGH | 9200 (Elasticsearch), 8080 (HTTP-Alt), 80 (HTTP), 443 (HTTPS) | May expose unencrypted or unauthenticated services |
Complete Script
import boto3
import json
from botocore.exceptions import ClientError
# Ports that should NEVER be open to 0.0.0.0/0
SENSITIVE_PORTS = {
22: "SSH",
3389: "RDP",
3306: "MySQL",
5432: "PostgreSQL",
27017: "MongoDB",
6379: "Redis",
9200: "Elasticsearch",
8080: "HTTP-Alt",
443: "HTTPS",
80: "HTTP",
}
# These ports are always CRITICAL — direct admin or database access
HIGH_RISK_PORTS = {22, 3389, 3306, 5432, 27017, 6379}
def audit_security_groups(
region: str = "us-east-1",
auto_remediate: bool = False,
) -> list[dict]:
"""
Iterates all Security Groups in the region and checks each
inbound rule (IpPermissions) for unrestricted CIDR ranges.
IpPermissions structure:
[
{
"IpProtocol": "tcp",
"FromPort": 22,
"ToPort": 22,
"IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": ""}],
"Ipv6Ranges": [{"CidrIpv6": "::/0"}],
"UserIdGroupPairs": [], # Cross-account SG references
"PrefixListIds": []
},
...
]
IpProtocol = "-1" means ALL traffic (any protocol, any port).
"""
ec2 = boto3.client("ec2", region_name=region)
findings: list[dict] = []
paginator = ec2.get_paginator("describe_security_groups")
for page in paginator.paginate():
for sg in page["SecurityGroups"]:
sg_id = sg["GroupId"]
sg_name = sg["GroupName"]
vpc_id = sg.get("VpcId", "EC2-Classic")
# ── Check each inbound rule ────────────────────────────
for rule in sg.get("IpPermissions", []):
from_port = rule.get("FromPort", 0)
to_port = rule.get("ToPort", 65535)
protocol = rule.get("IpProtocol", "-1")
# Collect all unrestricted CIDR sources (IPv4 + IPv6)
unrestricted_cidrs = []
for cidr_range in rule.get("IpRanges", []):
if cidr_range.get("CidrIp") == "0.0.0.0/0":
unrestricted_cidrs.append("0.0.0.0/0")
for ipv6_range in rule.get("Ipv6Ranges", []):
if ipv6_range.get("CidrIpv6") == "::/0":
unrestricted_cidrs.append("::/0")
if not unrestricted_cidrs:
continue # This rule is restricted to specific CIDRs — safe
# ── Check each sensitive port against this rule ────
for port, service in SENSITIVE_PORTS.items():
# protocol == "-1" means ALL traffic (matches every port)
# Otherwise, check if the port falls within [from_port, to_port]
port_exposed = (
protocol == "-1"
or (from_port <= port <= to_port)
)
if not port_exposed:
continue
severity = "CRITICAL" if port in HIGH_RISK_PORTS else "HIGH"
for cidr in unrestricted_cidrs:
finding = {
"sg_id": sg_id,
"sg_name": sg_name,
"vpc_id": vpc_id,
"port": port,
"service": service,
"protocol": protocol,
"cidr": cidr,
"severity": severity,
}
findings.append(finding)
icon = "🔴" if severity == "CRITICAL" else "🟠"
print(
f"{icon} [{severity}] {sg_id} ({sg_name}) in {vpc_id} "
f"allows {service} port {port} from {cidr}"
)
# ── Auto-remediate critical findings ───────
if auto_remediate and port in HIGH_RISK_PORTS:
revoke_rule(ec2, sg_id, rule, cidr)
# ── Summary ────────────────────────────────────────────────────
critical = [f for f in findings if f["severity"] == "CRITICAL"]
high = [f for f in findings if f["severity"] == "HIGH"]
print(f"\n{'='*60}")
print(f"Total findings: {len(findings)}")
print(f"Critical: {len(critical)}")
print(f"High: {len(high)}")
if critical:
print("\nCRITICAL — Immediate action required:")
seen = set()
for f in critical:
key = f"{f['sg_id']}:{f['port']}"
if key not in seen:
seen.add(key)
print(f" {f['sg_id']} ({f['sg_name']}) → {f['service']} from {f['cidr']}")
return findings
def revoke_rule(ec2_client, sg_id: str, rule: dict, cidr: str) -> None:
"""
revoke_security_group_ingress() removes specific CIDR ranges from a rule.
We reconstruct the IpPermissions entry from the original rule dict,
replacing IpRanges with ONLY the CIDR we want to remove.
Other CIDRs in the same rule are untouched.
This is non-destructive — if the rule allows both 0.0.0.0/0 and
10.0.0.0/8, only 0.0.0.0/0 is revoked; the private range stays.
"""
try:
ip_perm = {
"IpProtocol": rule["IpProtocol"],
"IpRanges": [{"CidrIp": cidr}] if cidr != "::/0" else [],
"Ipv6Ranges": [{"CidrIpv6": cidr}] if cidr == "::/0" else [],
}
# Only include port range if the protocol is not "-1" (all traffic)
if rule.get("IpProtocol") != "-1":
ip_perm["FromPort"] = rule.get("FromPort", 0)
ip_perm["ToPort"] = rule.get("ToPort", 65535)
ec2_client.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=[ip_perm],
)
print(f" ✅ Revoked {cidr} from {sg_id}")
except ClientError as e:
print(f" ❌ Failed to revoke {cidr} from {sg_id}: {e}")
def save_report(findings: list[dict], output: str = "sg_audit_report.json") -> None:
with open(output, "w") as f:
json.dump(findings, f, indent=2)
print(f"\nFull report saved to {output}")
if __name__ == "__main__":
# Audit-only mode first (auto_remediate=False)
findings = audit_security_groups(region="ap-south-1", auto_remediate=False)
save_report(findings)
# To auto-fix critical findings, change to:
# audit_security_groups(region="ap-south-1", auto_remediate=True)
Sample Output
🔴 [CRITICAL] sg-0abc123 (dev-server-sg) in vpc-0def456 allows SSH port 22 from 0.0.0.0/0
🟠 [HIGH] sg-0abc123 (dev-server-sg) in vpc-0def456 allows HTTP-Alt port 8080 from 0.0.0.0/0
🔴 [CRITICAL] sg-0ghi789 (staging-db-sg) in vpc-0def456 allows MySQL port 3306 from 0.0.0.0/0
============================================================
Total findings: 3
Critical: 2
High: 1
CRITICAL — Immediate action required:
sg-0abc123 (dev-server-sg) → SSH from 0.0.0.0/0
sg-0ghi789 (staging-db-sg) → MySQL from 0.0.0.0/0
Key Commands Explained
| Command | What it does |
|---|
get_paginator("describe_security_groups") | Paginates all SGs — important if you have 100+ SGs |
sg["IpPermissions"] | List of inbound rules for this Security Group |
rule["IpRanges"] | List of IPv4 CIDR ranges for this rule |
rule.get("IpProtocol") == "-1" | Matches rules that allow ALL protocols/ports |
from_port <= port <= to_port | Checks if a sensitive port falls within the rule’s range |
revoke_security_group_ingress(GroupId, IpPermissions) | Removes specific CIDR from a rule without deleting other CIDRs |
Prevention: Use AWS Config Rules
# Enable the AWS managed Config rule that detects this automatically
config = boto3.client("config")
config.put_config_rule(
ConfigRule={
"ConfigRuleName": "restricted-ssh",
"Source": {
"Owner": "AWS",
"SourceIdentifier": "INCOMING_SSH_DISABLED",
},
}
)
🔍 Line-by-Line Code Walkthrough
Module-Level Constants
SENSITIVE_PORTS = {22: "SSH", 3389: "RDP", 3306: "MySQL", ...}
HIGH_RISK_PORTS = {22, 3389, 3306, 5432, 27017, 6379}
| Line | Explanation |
|---|
SENSITIVE_PORTS = {port: service_name} | A dict mapping port numbers to human-readable service names. Used to identify which service is exposed when checking rules |
HIGH_RISK_PORTS = {22, 3389, ...} | A Python set (fast O(1) membership tests) of ports that indicate direct administrative or database access — always classified CRITICAL |
| Why separate them? | SENSITIVE_PORTS is for detection (all ports to check). HIGH_RISK_PORTS is for severity classification (CRITICAL vs HIGH) |
ec2 = boto3.client("ec2", region_name=region)
paginator = ec2.get_paginator("describe_security_groups")
for page in paginator.paginate():
for sg in page["SecurityGroups"]:
| Line | Explanation |
|---|
get_paginator("describe_security_groups") | Handles pagination. Large accounts can have hundreds of Security Groups |
paginate() with no Filters | Returns ALL security groups. We check every one |
sg["GroupId"] | The SG identifier (e.g., sg-0abc123) |
sg["GroupName"] | Human-readable name (e.g., "dev-server-sg") |
sg.get("VpcId", "EC2-Classic") | The VPC this SG belongs to. .get() with default handles the rare case of EC2-Classic (no VpcId) |
Parsing IpPermissions
for rule in sg.get("IpPermissions", []):
from_port = rule.get("FromPort", 0)
to_port = rule.get("ToPort", 65535)
protocol = rule.get("IpProtocol", "-1")
| Line | Explanation |
|---|
sg.get("IpPermissions", []) | The list of inbound rules. Empty list if no inbound rules |
rule.get("FromPort", 0) | Start of the port range. .get() with default 0 because when IpProtocol="-1" (all traffic), AWS does not include FromPort/ToPort in the response |
rule.get("ToPort", 65535) | End of the port range. Default 65535 (max port) means we’ll catch all ports when protocol is "-1" |
rule.get("IpProtocol", "-1") | The protocol: "tcp", "udp", "icmp", or "-1" (all protocols) |
Finding Unrestricted CIDRs
unrestricted_cidrs = []
for cidr_range in rule.get("IpRanges", []):
if cidr_range.get("CidrIp") == "0.0.0.0/0":
unrestricted_cidrs.append("0.0.0.0/0")
for ipv6_range in rule.get("Ipv6Ranges", []):
if ipv6_range.get("CidrIpv6") == "::/0":
unrestricted_cidrs.append("::/0")
| Line | Explanation |
|---|
rule.get("IpRanges", []) | IPv4 CIDR ranges for this rule. Each is a dict with "CidrIp" key |
cidr_range.get("CidrIp") == "0.0.0.0/0" | 0.0.0.0/0 means “any IPv4 address” — the entire internet |
rule.get("Ipv6Ranges", []) | IPv6 ranges. Each has "CidrIpv6" key |
"::/0" | IPv6 equivalent of 0.0.0.0/0 — any IPv6 address. Don’t forget IPv6! Many audits miss this |
if not unrestricted_cidrs: continue | If the rule doesn’t have any unrestricted CIDRs, it’s fine — skip to the next rule |
Port Exposure Check
for port, service in SENSITIVE_PORTS.items():
port_exposed = (
protocol == "-1"
or (from_port <= port <= to_port)
)
if not port_exposed:
continue
severity = "CRITICAL" if port in HIGH_RISK_PORTS else "HIGH"
| Line | Explanation |
|---|
protocol == "-1" | Matches rules that allow ALL traffic (e.g., a rule with protocol -1 exposes every port) |
from_port <= port <= to_port | Python chained comparison. Checks if our sensitive port falls within the rule’s range (e.g., port 22 within range 0-65535) |
"CRITICAL" if port in HIGH_RISK_PORTS else "HIGH" | in HIGH_RISK_PORTS is O(1) set lookup. CRITICAL for admin ports, HIGH for others |
revoke_rule(ec2_client, sg_id, rule, cidr)
ip_perm = {
"IpProtocol": rule["IpProtocol"],
"IpRanges": [{"CidrIp": cidr}] if cidr != "::/0" else [],
"Ipv6Ranges": [{"CidrIpv6": cidr}] if cidr == "::/0" else [],
}
if rule.get("IpProtocol") != "-1":
ip_perm["FromPort"] = rule.get("FromPort", 0)
ip_perm["ToPort"] = rule.get("ToPort", 65535)
ec2_client.revoke_security_group_ingress(
GroupId=sg_id,
IpPermissions=[ip_perm],
)
| Line | Explanation |
|---|
"IpRanges": [{"CidrIp": cidr}] if cidr != "::/0" else [] | Conditionally populates IPv4 ranges. "::/0" is an IPv6 address — not valid in IpRanges |
"Ipv6Ranges": [{"CidrIpv6": cidr}] if cidr == "::/0" else [] | Conditionally populates IPv6 ranges only for the "::/0" CIDR |
if rule.get("IpProtocol") != "-1": add FromPort/ToPort | When protocol is "-1" (all traffic), AWS doesn’t use port ranges and rejects them if included. We only add ports for tcp/udp rules |
revoke_security_group_ingress(GroupId, IpPermissions) | Removes the specific CIDR from the rule. Only the specified CIDR is revoked — other CIDRs in the same rule are untouched |
Why reconstruct ip_perm? | The revoke API requires a precise match of the rule to remove. We build the minimum dict that matches just the CIDR we want to revoke |