AWS EC2 Security Groups Unrestricted SSH Check: SOC2 Compliance

This runbook helps enforce SOC2 compliance in AWS environments. It identifies and remediates security groups allowing unrestricted SSH access in running EC2 instances, ensuring robust security and compliance with SOC2 standards.

region_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.
copied
  1. 1

    Amazon Elastic Compute Cloud (EC2) is a service offered by Amazon Web Services (AWS) that provides resizable compute capacity in the cloud. Through Boto3's EC2 client, the describe_instances() method provides detailed information about each instance, including its ID, type, launch time, and current state. This capability assists users in effectively monitoring and managing their cloud resources.

    import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_regions(): ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name = 'us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def list_ec2_instances(region=None): # If no region is provided, fetch instances from all regions regions = [region] if region else list_all_regions() # Create an empty list to store instance details instance_details = [] for region in regions: # Try initializing the Boto3 EC2 client for the specific region try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except (NoCredentialsError, PartialCredentialsError): print(f"Failed for {region}: No AWS credentials found or incomplete credentials provided.") continue except BotoCoreError as e: print(f"Failed for {region}: Error initializing the EC2 client due to BotoCore Error: {e}") continue except Exception as e: print(f"Failed for {region}: Unexpected error initializing the EC2 client: {e}") continue #print(f"Fetching EC2 instance details for region: {region}...") # Try to paginate through the EC2 instance responses for the specific region try: paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page['Reservations']: for instance in reservation['Instances']: # Extract the desired attributes instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'] state = instance['State']['Name'] # Append the details to the list instance_details.append({ 'InstanceId': instance_id, 'InstanceType': instance_type, 'LaunchTime': launch_time, 'State': state, 'Region': region }) #print(f"Fetched all instance details for region: {region} successfully!") except ClientError as e: print(f"Failed for {region}: AWS Client Error while fetching EC2 instance details: {e}") except Exception as e: print(f"Failed for {region}: Unexpected error while fetching EC2 instance details: {e}") return instance_details def display_instance_details(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "EC2 Instance Details" table.num_cols = 5 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = ["Instance ID", "Instance Type", "Launch Time", "State", "Region"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the instance data by launch time for better organization data.sort(key=lambda x: x["LaunchTime"], reverse=True) # Populate the table with instance data for row_num, instance in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each instance values = [ instance["InstanceId"], instance["InstanceType"], instance["LaunchTime"].strftime('%Y-%m-%d %H:%M:%S'), # Format the datetime instance["State"], instance["Region"] ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # You can replace None with a specific region string like 'us-east-1' to get instances from a specific region # Hardcoded region_name for One time Execution Result region_name=None instances_list = list_ec2_instances(region_name) if instances_list: ''' print("\nEC2 Instance Details:") for instance in instances_list: print("-" * 50) # Separator line for key, value in instance.items(): print(f"{key}: {value}")''' display_instance_details(instances_list) else: print("No instances found or an error occurred.")
    copied
    1
  2. 2

    This task is designed to audit AWS environments for SOC2 compliance. It systematically identifies security groups in running EC2 instances that permit unrestricted SSH access, flagging potential security vulnerabilities and aiding in maintaining SOC2 compliance standards.

    import boto3 from botocore.exceptions import NoCredentialsError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_ssh_access(security_group, ec2): try: sg = ec2.describe_security_groups(GroupIds=[security_group])['SecurityGroups'][0] for permission in sg.get('IpPermissions', []): if permission.get('FromPort') == 22 and permission.get('ToPort') == 22: for ip_range in permission.get('IpRanges', []): if ip_range.get('CidrIp') == '0.0.0.0/0': return True return False except Exception as e: print(f"Error checking security group {security_group}: {e}") return False def get_security_groups_in_use(ec2): sg_in_use = set() try: instances = ec2.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) for reservation in instances.get('Reservations', []): for instance in reservation.get('Instances', []): for sg in instance.get('SecurityGroups', []): sg_in_use.add(sg['GroupId']) return sg_in_use except Exception as e: print(f"Error retrieving instances: {e}") return sg_in_use def check_region_for_unrestricted_ssh(region, sgs_to_remediate): ec2 = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) print(f"Checking region: {region}") sg_in_use = get_security_groups_in_use(ec2) unrestricted_ssh = [sg_id for sg_id in sg_in_use if check_ssh_access(sg_id, ec2)] if unrestricted_ssh: print(f"Region {region}: Security Groups with unrestricted SSH access:") for sg_id in unrestricted_ssh: print(sg_id) sgs_to_remediate[region] = unrestricted_ssh else: print(f"Region {region}: No security groups with unrestricted SSH access found.") def check_all_regions(region_name=None): sgs_to_remediate = {} if region_name: check_region_for_unrestricted_ssh(region_name, sgs_to_remediate) else: ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] for region in regions: check_region_for_unrestricted_ssh(region, sgs_to_remediate) return sgs_to_remediate try: region_name = None # Set to specific region or None for all regions sgs_to_remediate = check_all_regions(region_name) print("Security Groups to Remediate:", sgs_to_remediate) except NoCredentialsError: print("Error: AWS credentials not available. Please configure them.") except ClientError as e: print(f"AWS Client error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") context.skip_sub_tasks=True
    copied
    2
    1. 2.1

      This task identifies and corrects security groups in AWS EC2, which allow unrestricted SSH access.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def remove_unrestricted_ssh(security_group_id, region): """ This function attempts to remove unrestricted SSH access from the specified security group. :param security_group_id: The ID of the AWS security group. :param region: The AWS region where the security group is located. :return: Boolean indicating whether the unrestricted SSH access was successfully removed. """ # Initialize the boto3 client for EC2 in the specified region. ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) try: # Retrieve the details of the specified security group. sg = ec2.describe_security_groups(GroupIds=[security_group_id])['SecurityGroups'][0] # Iterate over the ingress permissions of the security group. for permission in sg.get('IpPermissions', []): # Check for SSH access (port 22) from anywhere (0.0.0.0/0). if permission.get('FromPort') == 22 and permission.get('ToPort') == 22: for ip_range in permission.get('IpRanges', []): if ip_range.get('CidrIp') == '0.0.0.0/0': # Revoke the ingress rule that allows unrestricted SSH access. ec2.revoke_security_group_ingress( GroupId=security_group_id, IpPermissions=[{ 'FromPort': 22, 'ToPort': 22, 'IpProtocol': 'tcp', 'IpRanges': [{'CidrIp': '0.0.0.0/0'}] }] ) print(f"Removed unrestricted SSH access from {security_group_id} in {region}") return True # If no unrestricted SSH access is found. print(f"No unrestricted SSH access found for {security_group_id} in {region}") return False except ClientError as e: # Handle client errors, such as incorrect permissions or non-existent resources. print(f"ClientError modifying security group {security_group_id} in {region}: {e}") return False except BotoCoreError as e: # Handle errors from the core Boto3 library. print(f"BotoCoreError encountered: {e}") return False except Exception as e: # Catch-all for any other unexpected exceptions. print(f"An unexpected error occurred: {e}") return False def remediate_unrestricted_ssh(sgs_to_remediate): """ :param sgs_to_remediate: A dictionary where keys are AWS region names and values are lists of security group IDs. """ for region, sg_ids in sgs_to_remediate.items(): for sg_id in sg_ids: # Attempt to remediate each security group. remove_unrestricted_ssh(sg_id, region) # Example usage #sgs_to_remediate = {'us-west-2': ['sg-4232c07a']} # from upstream task remediate_unrestricted_ssh(sgs_to_remediate)
      copied
      2.1