Sign in

Filter out AWS EC2 Security Groups for ports which are open to All

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

This task involves identifying and listing all EC2 Security Groups that have ports accessible from any IP address (0.0.0.0/0), highlighting potential security risks where services are exposed to the entire internet.

import boto3 from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_ec2_client(region_name, aws_access_key_id, aws_secret_access_key): """ Initialize and return an EC2 client for the specified AWS region. """ try: return boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) except NoCredentialsError: print(f"Credentials not available for region: {region_name}") raise except Exception as e: print(f"Unexpected error during client initialization for region {region_name}: {e}") raise def scan_open_ports(security_groups, aws_access_key_id, aws_secret_access_key): """ Scan the provided security groups to find open ports accessible from any IP (0.0.0.0/0). Args: security_groups (list): A list of dictionaries, each representing a security group. aws_access_key_id (str): AWS access key ID. aws_secret_access_key (str): AWS secret access key. Returns: list: A list of dictionaries detailing the open ports, including the region of each security group. """ open_ports = [] clients = {} # Cache to store EC2 clients for each region to avoid reinitialization for sg in security_groups: region = sg['Region'] if region not in clients: clients[region] = get_ec2_client(region, aws_access_key_id, aws_secret_access_key) try: # Fetch the detailed info of each security group using the regional client response = clients[region].describe_security_groups(GroupIds=[sg['GroupId']]) for group in response['SecurityGroups']: for perm in group['IpPermissions']: # Check if rule allows all IP addresses for ip_range in perm['IpRanges']: if ip_range['CidrIp'] == '0.0.0.0/0': # Collect information about the open port and protocol open_ports.append({ 'GroupId': sg['GroupId'], 'GroupName': sg['GroupName'], 'Region': region, # Including the region in the details 'IpProtocol': perm['IpProtocol'], 'FromPort': perm.get('FromPort', 'All'), 'ToPort': perm.get('ToPort', 'All'), 'Description': ip_range.get('Description', 'No description provided') }) except ClientError as e: print(f"ClientError for group {sg['GroupId']} in region {region}: {e}") except BotoCoreError as e: print(f"BotoCoreError for group {sg['GroupId']} in region {region}: {e}") except Exception as e: print(f"An unexpected error occurred for group {sg['GroupId']} in region {region}: {e}") return open_ports def display_open_ports(data): """ Displays details about open ports in a table format. Args: data (list): A list of dictionaries containing open port details. """ # Initialize table with the desired structure and headers table = context.newtable() table.title = "Open Ports Details" table.num_cols = 7 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = ["Group ID", "Group Name", "Region", "Protocol", "From Port", "To Port", "Description"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the open port data by Group ID for better organization data.sort(key=lambda x: x["GroupId"]) # Populate the table with open port data for row_num, port in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each entry values = [ port["GroupId"], port["GroupName"], str(port["Region"]), port["IpProtocol"], str(port["FromPort"]), str(port["ToPort"]), port["Description"] ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Example usage try: open_ports = scan_open_ports(security_groups, access_key, secret_key) if open_ports: table = display_open_ports(open_ports) else: print("No ports open to all found.") except Exception as e: print(f"An error occurred during the scanning process: {e}")
copied