Remediate AWS EC2 Security Groups with unrestricted SSH Access: SOC2 Compliance

This task identifies and corrects security groups in AWS EC2, which allow unrestricted SSH access.

import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def remove_unrestricted_ssh(security_group_id, region): """ This function attempts to remove unrestricted SSH access from the specified security group. :param security_group_id: The ID of the AWS security group. :param region: The AWS region where the security group is located. :return: Boolean indicating whether the unrestricted SSH access was successfully removed. """ # Initialize the boto3 client for EC2 in the specified region. ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) try: # Retrieve the details of the specified security group. sg = ec2.describe_security_groups(GroupIds=[security_group_id])['SecurityGroups'][0] # Iterate over the ingress permissions of the security group. for permission in sg.get('IpPermissions', []): # Check for SSH access (port 22) from anywhere ( if permission.get('FromPort') == 22 and permission.get('ToPort') == 22: for ip_range in permission.get('IpRanges', []): if ip_range.get('CidrIp') == '': # Revoke the ingress rule that allows unrestricted SSH access. ec2.revoke_security_group_ingress( GroupId=security_group_id, IpPermissions=[{ 'FromPort': 22, 'ToPort': 22, 'IpProtocol': 'tcp', 'IpRanges': [{'CidrIp': ''}] }] ) print(f"Removed unrestricted SSH access from {security_group_id} in {region}") return True # If no unrestricted SSH access is found. print(f"No unrestricted SSH access found for {security_group_id} in {region}") return False except ClientError as e: # Handle client errors, such as incorrect permissions or non-existent resources. print(f"ClientError modifying security group {security_group_id} in {region}: {e}") return False except BotoCoreError as e: # Handle errors from the core Boto3 library. print(f"BotoCoreError encountered: {e}") return False except Exception as e: # Catch-all for any other unexpected exceptions. print(f"An unexpected error occurred: {e}") return False def remediate_unrestricted_ssh(sgs_to_remediate): """ :param sgs_to_remediate: A dictionary where keys are AWS region names and values are lists of security group IDs. """ for region, sg_ids in sgs_to_remediate.items(): for sg_id in sg_ids: # Attempt to remediate each security group. remove_unrestricted_ssh(sg_id, region) # Example usage #sgs_to_remediate = {'us-west-2': ['sg-4232c07a']} # from upstream task remediate_unrestricted_ssh(sgs_to_remediate)