agent: |
dmleQnSG34MzPaPGRlgrFilter out AWS EC2 Security Groups for ports which are open to All
Filter out AWS EC2 Security Groups for ports which are open to All
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
This task involves identifying and listing all EC2 Security Groups that have ports accessible from any IP address (0.0.0.0/0), highlighting potential security risks where services are exposed to the entire internet.
inputs
outputs
import boto3
from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def get_ec2_client(region_name, aws_access_key_id, aws_secret_access_key):
"""
Initialize and return an EC2 client for the specified AWS region.
"""
try:
return boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name)
except NoCredentialsError:
print(f"Credentials not available for region: {region_name}")
raise
except Exception as e:
print(f"Unexpected error during client initialization for region {region_name}: {e}")
raise
def scan_open_ports(security_groups, aws_access_key_id, aws_secret_access_key):
"""
Scan the provided security groups to find open ports accessible from any IP (0.0.0.0/0).
Args:
security_groups (list): A list of dictionaries, each representing a security group.
aws_access_key_id (str): AWS access key ID.
aws_secret_access_key (str): AWS secret access key.
Returns:
list: A list of dictionaries detailing the open ports, including the region of each security group.
"""
open_ports = []
clients = {} # Cache to store EC2 clients for each region to avoid reinitialization
for sg in security_groups:
region = sg['Region']
if region not in clients:
clients[region] = get_ec2_client(region, aws_access_key_id, aws_secret_access_key)
try:
# Fetch the detailed info of each security group using the regional client
response = clients[region].describe_security_groups(GroupIds=[sg['GroupId']])
for group in response['SecurityGroups']:
for perm in group['IpPermissions']:
# Check if rule allows all IP addresses
for ip_range in perm['IpRanges']:
if ip_range['CidrIp'] == '0.0.0.0/0':
# Collect information about the open port and protocol
open_ports.append({
'GroupId': sg['GroupId'],
'GroupName': sg['GroupName'],
'Region': region, # Including the region in the details
'IpProtocol': perm['IpProtocol'],
'FromPort': perm.get('FromPort', 'All'),
'ToPort': perm.get('ToPort', 'All'),
'Description': ip_range.get('Description', 'No description provided')
})
except ClientError as e:
print(f"ClientError for group {sg['GroupId']} in region {region}: {e}")
except BotoCoreError as e:
print(f"BotoCoreError for group {sg['GroupId']} in region {region}: {e}")
except Exception as e:
print(f"An unexpected error occurred for group {sg['GroupId']} in region {region}: {e}")
return open_ports
def display_open_ports(data):
"""
Displays details about open ports in a table format.
Args:
data (list): A list of dictionaries containing open port details.
"""
# Initialize table with the desired structure and headers
table = context.newtable()
table.title = "Open Ports Details"
table.num_cols = 7 # Number of columns according to headers
table.num_rows = 1 # Starts with one row for headers
table.has_header_row = True
# Define header names based on the new structure
headers = ["Group ID", "Group Name", "Region", "Protocol", "From Port", "To Port", "Description"]
# Set headers in the first row
for col_num, header in enumerate(headers):
table.setval(0, col_num, header)
# Sort the open port data by Group ID for better organization
data.sort(key=lambda x: x["GroupId"])
# Populate the table with open port data
for row_num, port in enumerate(data, start=1): # Starting from the second row
table.num_rows += 1 # Add a row for each entry
values = [
port["GroupId"],
port["GroupName"],
str(port["Region"]),
port["IpProtocol"],
str(port["FromPort"]),
str(port["ToPort"]),
port["Description"]
]
for col_num, value in enumerate(values):
table.setval(row_num, col_num, value)
# Example usage
try:
open_ports = scan_open_ports(security_groups, access_key, secret_key)
if open_ports:
table = display_open_ports(open_ports)
else:
print("No ports open to all found.")
except Exception as e:
print(f"An error occurred during the scanning process: {e}")
copied