Scenario Intermediate Python Python AWS Scripting

Security Group Audit — Find Unrestricted Inbound Rules (0.0.0.0/0)

Python script to audit all VPC Security Groups, identify rules that expose sensitive ports to the internet, and optionally auto-remediate critical findings.

January 20, 2025 7 min read ~20 min to complete DB
The Situation

Security posture auditing — run weekly to catch any Security Group that accidentally exposes SSH, RDP, or database ports to the public internet.

5 Steps
4 Services Used
~20 min Duration
Intermediate Difficulty

Problem Statement

A developer accidentally added 0.0.0.0/0 to a Security Group to “quickly test” SSH access. They forgot to remove it. Three days later, the instance was cryptomining. This script catches that misconfiguration before it becomes a breach.


Risk Levels

SeverityPortsWhy dangerous
CRITICAL22 (SSH), 3389 (RDP), 3306 (MySQL), 5432 (PostgreSQL), 27017 (MongoDB), 6379 (Redis)Direct remote access to servers or databases
HIGH9200 (Elasticsearch), 8080 (HTTP-Alt), 80 (HTTP), 443 (HTTPS)May expose unencrypted or unauthenticated services

Complete Script

import boto3
import json
from botocore.exceptions import ClientError


# Ports that should NEVER be open to 0.0.0.0/0
SENSITIVE_PORTS = {
    22:    "SSH",
    3389:  "RDP",
    3306:  "MySQL",
    5432:  "PostgreSQL",
    27017: "MongoDB",
    6379:  "Redis",
    9200:  "Elasticsearch",
    8080:  "HTTP-Alt",
    443:   "HTTPS",
    80:    "HTTP",
}

# These ports are always CRITICAL — direct admin or database access
HIGH_RISK_PORTS = {22, 3389, 3306, 5432, 27017, 6379}


def audit_security_groups(
    region: str = "us-east-1",
    auto_remediate: bool = False,
) -> list[dict]:
    """
    Iterates all Security Groups in the region and checks each
    inbound rule (IpPermissions) for unrestricted CIDR ranges.

    IpPermissions structure:
    [
      {
        "IpProtocol": "tcp",
        "FromPort": 22,
        "ToPort": 22,
        "IpRanges": [{"CidrIp": "0.0.0.0/0", "Description": ""}],
        "Ipv6Ranges": [{"CidrIpv6": "::/0"}],
        "UserIdGroupPairs": [],   # Cross-account SG references
        "PrefixListIds": []
      },
      ...
    ]

    IpProtocol = "-1" means ALL traffic (any protocol, any port).
    """
    ec2 = boto3.client("ec2", region_name=region)
    findings: list[dict] = []

    paginator = ec2.get_paginator("describe_security_groups")
    for page in paginator.paginate():
        for sg in page["SecurityGroups"]:
            sg_id   = sg["GroupId"]
            sg_name = sg["GroupName"]
            vpc_id  = sg.get("VpcId", "EC2-Classic")

            # ── Check each inbound rule ────────────────────────────
            for rule in sg.get("IpPermissions", []):
                from_port = rule.get("FromPort", 0)
                to_port   = rule.get("ToPort",   65535)
                protocol  = rule.get("IpProtocol", "-1")

                # Collect all unrestricted CIDR sources (IPv4 + IPv6)
                unrestricted_cidrs = []
                for cidr_range in rule.get("IpRanges", []):
                    if cidr_range.get("CidrIp") == "0.0.0.0/0":
                        unrestricted_cidrs.append("0.0.0.0/0")
                for ipv6_range in rule.get("Ipv6Ranges", []):
                    if ipv6_range.get("CidrIpv6") == "::/0":
                        unrestricted_cidrs.append("::/0")

                if not unrestricted_cidrs:
                    continue   # This rule is restricted to specific CIDRs — safe

                # ── Check each sensitive port against this rule ────
                for port, service in SENSITIVE_PORTS.items():
                    # protocol == "-1" means ALL traffic (matches every port)
                    # Otherwise, check if the port falls within [from_port, to_port]
                    port_exposed = (
                        protocol == "-1"
                        or (from_port <= port <= to_port)
                    )

                    if not port_exposed:
                        continue

                    severity = "CRITICAL" if port in HIGH_RISK_PORTS else "HIGH"

                    for cidr in unrestricted_cidrs:
                        finding = {
                            "sg_id":    sg_id,
                            "sg_name":  sg_name,
                            "vpc_id":   vpc_id,
                            "port":     port,
                            "service":  service,
                            "protocol": protocol,
                            "cidr":     cidr,
                            "severity": severity,
                        }
                        findings.append(finding)

                        icon = "🔴" if severity == "CRITICAL" else "🟠"
                        print(
                            f"{icon} [{severity}] {sg_id} ({sg_name}) in {vpc_id} "
                            f"allows {service} port {port} from {cidr}"
                        )

                        # ── Auto-remediate critical findings ───────
                        if auto_remediate and port in HIGH_RISK_PORTS:
                            revoke_rule(ec2, sg_id, rule, cidr)

    # ── Summary ────────────────────────────────────────────────────
    critical = [f for f in findings if f["severity"] == "CRITICAL"]
    high     = [f for f in findings if f["severity"] == "HIGH"]

    print(f"\n{'='*60}")
    print(f"Total findings:  {len(findings)}")
    print(f"Critical:        {len(critical)}")
    print(f"High:            {len(high)}")

    if critical:
        print("\nCRITICAL — Immediate action required:")
        seen = set()
        for f in critical:
            key = f"{f['sg_id']}:{f['port']}"
            if key not in seen:
                seen.add(key)
                print(f"  {f['sg_id']} ({f['sg_name']}) → {f['service']} from {f['cidr']}")

    return findings


def revoke_rule(ec2_client, sg_id: str, rule: dict, cidr: str) -> None:
    """
    revoke_security_group_ingress() removes specific CIDR ranges from a rule.

    We reconstruct the IpPermissions entry from the original rule dict,
    replacing IpRanges with ONLY the CIDR we want to remove.
    Other CIDRs in the same rule are untouched.

    This is non-destructive — if the rule allows both 0.0.0.0/0 and
    10.0.0.0/8, only 0.0.0.0/0 is revoked; the private range stays.
    """
    try:
        ip_perm = {
            "IpProtocol": rule["IpProtocol"],
            "IpRanges":   [{"CidrIp": cidr}] if cidr != "::/0" else [],
            "Ipv6Ranges":  [{"CidrIpv6": cidr}] if cidr == "::/0" else [],
        }
        # Only include port range if the protocol is not "-1" (all traffic)
        if rule.get("IpProtocol") != "-1":
            ip_perm["FromPort"] = rule.get("FromPort", 0)
            ip_perm["ToPort"]   = rule.get("ToPort",   65535)

        ec2_client.revoke_security_group_ingress(
            GroupId=sg_id,
            IpPermissions=[ip_perm],
        )
        print(f"    ✅ Revoked {cidr} from {sg_id}")
    except ClientError as e:
        print(f"    ❌ Failed to revoke {cidr} from {sg_id}: {e}")


def save_report(findings: list[dict], output: str = "sg_audit_report.json") -> None:
    with open(output, "w") as f:
        json.dump(findings, f, indent=2)
    print(f"\nFull report saved to {output}")


if __name__ == "__main__":
    # Audit-only mode first (auto_remediate=False)
    findings = audit_security_groups(region="ap-south-1", auto_remediate=False)
    save_report(findings)

    # To auto-fix critical findings, change to:
    # audit_security_groups(region="ap-south-1", auto_remediate=True)

Sample Output

🔴 [CRITICAL] sg-0abc123 (dev-server-sg) in vpc-0def456 allows SSH port 22 from 0.0.0.0/0
🟠 [HIGH] sg-0abc123 (dev-server-sg) in vpc-0def456 allows HTTP-Alt port 8080 from 0.0.0.0/0
🔴 [CRITICAL] sg-0ghi789 (staging-db-sg) in vpc-0def456 allows MySQL port 3306 from 0.0.0.0/0

============================================================
Total findings:  3
Critical:        2
High:            1

CRITICAL — Immediate action required:
  sg-0abc123 (dev-server-sg) → SSH from 0.0.0.0/0
  sg-0ghi789 (staging-db-sg) → MySQL from 0.0.0.0/0

Key Commands Explained

CommandWhat it does
get_paginator("describe_security_groups")Paginates all SGs — important if you have 100+ SGs
sg["IpPermissions"]List of inbound rules for this Security Group
rule["IpRanges"]List of IPv4 CIDR ranges for this rule
rule.get("IpProtocol") == "-1"Matches rules that allow ALL protocols/ports
from_port <= port <= to_portChecks if a sensitive port falls within the rule’s range
revoke_security_group_ingress(GroupId, IpPermissions)Removes specific CIDR from a rule without deleting other CIDRs

Prevention: Use AWS Config Rules

# Enable the AWS managed Config rule that detects this automatically
config = boto3.client("config")
config.put_config_rule(
    ConfigRule={
        "ConfigRuleName": "restricted-ssh",
        "Source": {
            "Owner": "AWS",
            "SourceIdentifier": "INCOMING_SSH_DISABLED",
        },
    }
)

🔍 Line-by-Line Code Walkthrough

Module-Level Constants

SENSITIVE_PORTS = {22: "SSH", 3389: "RDP", 3306: "MySQL", ...}
HIGH_RISK_PORTS = {22, 3389, 3306, 5432, 27017, 6379}
LineExplanation
SENSITIVE_PORTS = {port: service_name}A dict mapping port numbers to human-readable service names. Used to identify which service is exposed when checking rules
HIGH_RISK_PORTS = {22, 3389, ...}A Python set (fast O(1) membership tests) of ports that indicate direct administrative or database access — always classified CRITICAL
Why separate them?SENSITIVE_PORTS is for detection (all ports to check). HIGH_RISK_PORTS is for severity classification (CRITICAL vs HIGH)

audit_security_groups(region, auto_remediate)

ec2 = boto3.client("ec2", region_name=region)
paginator = ec2.get_paginator("describe_security_groups")
for page in paginator.paginate():
    for sg in page["SecurityGroups"]:
LineExplanation
get_paginator("describe_security_groups")Handles pagination. Large accounts can have hundreds of Security Groups
paginate() with no FiltersReturns ALL security groups. We check every one
sg["GroupId"]The SG identifier (e.g., sg-0abc123)
sg["GroupName"]Human-readable name (e.g., "dev-server-sg")
sg.get("VpcId", "EC2-Classic")The VPC this SG belongs to. .get() with default handles the rare case of EC2-Classic (no VpcId)

Parsing IpPermissions

for rule in sg.get("IpPermissions", []):
    from_port = rule.get("FromPort", 0)
    to_port   = rule.get("ToPort",   65535)
    protocol  = rule.get("IpProtocol", "-1")
LineExplanation
sg.get("IpPermissions", [])The list of inbound rules. Empty list if no inbound rules
rule.get("FromPort", 0)Start of the port range. .get() with default 0 because when IpProtocol="-1" (all traffic), AWS does not include FromPort/ToPort in the response
rule.get("ToPort", 65535)End of the port range. Default 65535 (max port) means we’ll catch all ports when protocol is "-1"
rule.get("IpProtocol", "-1")The protocol: "tcp", "udp", "icmp", or "-1" (all protocols)

Finding Unrestricted CIDRs

unrestricted_cidrs = []
for cidr_range in rule.get("IpRanges", []):
    if cidr_range.get("CidrIp") == "0.0.0.0/0":
        unrestricted_cidrs.append("0.0.0.0/0")
for ipv6_range in rule.get("Ipv6Ranges", []):
    if ipv6_range.get("CidrIpv6") == "::/0":
        unrestricted_cidrs.append("::/0")
LineExplanation
rule.get("IpRanges", [])IPv4 CIDR ranges for this rule. Each is a dict with "CidrIp" key
cidr_range.get("CidrIp") == "0.0.0.0/0"0.0.0.0/0 means “any IPv4 address” — the entire internet
rule.get("Ipv6Ranges", [])IPv6 ranges. Each has "CidrIpv6" key
"::/0"IPv6 equivalent of 0.0.0.0/0 — any IPv6 address. Don’t forget IPv6! Many audits miss this
if not unrestricted_cidrs: continueIf the rule doesn’t have any unrestricted CIDRs, it’s fine — skip to the next rule

Port Exposure Check

for port, service in SENSITIVE_PORTS.items():
    port_exposed = (
        protocol == "-1"
        or (from_port <= port <= to_port)
    )
    if not port_exposed:
        continue
    severity = "CRITICAL" if port in HIGH_RISK_PORTS else "HIGH"
LineExplanation
protocol == "-1"Matches rules that allow ALL traffic (e.g., a rule with protocol -1 exposes every port)
from_port <= port <= to_portPython chained comparison. Checks if our sensitive port falls within the rule’s range (e.g., port 22 within range 0-65535)
"CRITICAL" if port in HIGH_RISK_PORTS else "HIGH"in HIGH_RISK_PORTS is O(1) set lookup. CRITICAL for admin ports, HIGH for others

revoke_rule(ec2_client, sg_id, rule, cidr)

ip_perm = {
    "IpProtocol": rule["IpProtocol"],
    "IpRanges":   [{"CidrIp": cidr}] if cidr != "::/0" else [],
    "Ipv6Ranges":  [{"CidrIpv6": cidr}] if cidr == "::/0" else [],
}
if rule.get("IpProtocol") != "-1":
    ip_perm["FromPort"] = rule.get("FromPort", 0)
    ip_perm["ToPort"]   = rule.get("ToPort",   65535)

ec2_client.revoke_security_group_ingress(
    GroupId=sg_id,
    IpPermissions=[ip_perm],
)
LineExplanation
"IpRanges": [{"CidrIp": cidr}] if cidr != "::/0" else []Conditionally populates IPv4 ranges. "::/0" is an IPv6 address — not valid in IpRanges
"Ipv6Ranges": [{"CidrIpv6": cidr}] if cidr == "::/0" else []Conditionally populates IPv6 ranges only for the "::/0" CIDR
if rule.get("IpProtocol") != "-1": add FromPort/ToPortWhen protocol is "-1" (all traffic), AWS doesn’t use port ranges and rejects them if included. We only add ports for tcp/udp rules
revoke_security_group_ingress(GroupId, IpPermissions)Removes the specific CIDR from the rule. Only the specified CIDR is revoked — other CIDRs in the same rule are untouched
Why reconstruct ip_perm?The revoke API requires a precise match of the rule to remove. We build the minimum dict that matches just the CIDR we want to revoke
Services Used
EC2VPCboto3IAM
Prerequisites
  • Python 3.8+
  • boto3
  • IAM: ec2:DescribeSecurityGroups, ec2:RevokeSecurityGroupIngress
What You Learned
  • Security Group IpPermissions structure
  • CIDR range checking
  • Sensitive port mapping
  • revoke_security_group_ingress for remediation

Have a similar scenario to share?

Production incidents are the best teachers. Submit your real-world scenario and help others learn.

Open Google Form

Related Scenarios