jglVdOlHBGzIXnjIlluVRemediate AWS EC2 Security Groups with unrestricted SSH Access: SOC2 Compliance
Remediate AWS EC2 Security Groups with unrestricted SSH Access: SOC2 Compliance
This task identifies and corrects security groups in AWS EC2, which allow unrestricted SSH access.
inputs
outputs
import boto3
from botocore.exceptions import ClientError, BotoCoreError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def remove_unrestricted_ssh(security_group_id, region):
"""
This function attempts to remove unrestricted SSH access from the specified security group.
:param security_group_id: The ID of the AWS security group.
:param region: The AWS region where the security group is located.
:return: Boolean indicating whether the unrestricted SSH access was successfully removed.
"""
# Initialize the boto3 client for EC2 in the specified region.
ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region)
try:
# Retrieve the details of the specified security group.
sg = ec2.describe_security_groups(GroupIds=[security_group_id])['SecurityGroups'][0]
# Iterate over the ingress permissions of the security group.
for permission in sg.get('IpPermissions', []):
# Check for SSH access (port 22) from anywhere (0.0.0.0/0).
if permission.get('FromPort') == 22 and permission.get('ToPort') == 22:
for ip_range in permission.get('IpRanges', []):
if ip_range.get('CidrIp') == '0.0.0.0/0':
# Revoke the ingress rule that allows unrestricted SSH access.
ec2.revoke_security_group_ingress(
GroupId=security_group_id,
IpPermissions=[{
'FromPort': 22,
'ToPort': 22,
'IpProtocol': 'tcp',
'IpRanges': [{'CidrIp': '0.0.0.0/0'}]
}]
)
print(f"Removed unrestricted SSH access from {security_group_id} in {region}")
return True
# If no unrestricted SSH access is found.
print(f"No unrestricted SSH access found for {security_group_id} in {region}")
return False
except ClientError as e:
# Handle client errors, such as incorrect permissions or non-existent resources.
print(f"ClientError modifying security group {security_group_id} in {region}: {e}")
return False
except BotoCoreError as e:
# Handle errors from the core Boto3 library.
print(f"BotoCoreError encountered: {e}")
return False
except Exception as e:
# Catch-all for any other unexpected exceptions.
print(f"An unexpected error occurred: {e}")
return False
def remediate_unrestricted_ssh(sgs_to_remediate):
"""
:param sgs_to_remediate: A dictionary where keys are AWS region names and values are lists of security group IDs.
"""
for region, sg_ids in sgs_to_remediate.items():
for sg_id in sg_ids:
# Attempt to remediate each security group.
remove_unrestricted_ssh(sg_id, region)
# Example usage
#sgs_to_remediate = {'us-west-2': ['sg-4232c07a']} # from upstream task
remediate_unrestricted_ssh(sgs_to_remediate)
copied