SOC2 Compliance Tasks

This runbook outlines specific tasks and procedures for ensuring AWS cloud services adhere to SOC2 standards, focusing on security, availability, processing integrity, confidentiality, and privacy. It includes audits and configurations for AWS CloudTrail, IAM policies, S3 bucket security and encryption, and more, aimed at maintaining data integrity, confidentiality, and minimizing unauthorized access to ensure compliance with SOC2 requirements.

  1. 1

    This runbook conducts an audit, ensuring that S3 buckets within AWS do not allow unauthorized public write access. This audit reviews Block Public Access settings, bucket policies, and ACLs to adhere to SOC2's strict data security standards. It aims to identify and rectify any configurations that may compromise data integrity and confidentiality.

    1
    1. 1.1

      This task involves retrieving and listing the names of all the S3 buckets that are currently associated with your AWS account. By fetching this list, you gain an overview of the existing S3 buckets under your account, which can aid in resource management, access control, and tracking. This information is valuable for maintaining an organized and well-structured AWS environment, ensuring efficient storage utilization, and facilitating easy navigation of your stored data.

      import json cmd = "aws s3api list-buckets" output = _exe(None, cmd,cred_label=cred_label) #Parse the JSON response response_data = json.loads(output) #Extract bucket names bucket_names = [bucket["Name"] for bucket in response_data["Buckets"]] #Print the extracted bucket names: for bucket_name in bucket_names: print(bucket_name)
      copied
      1.1
    2. 1.2

      The task involves auditing AWS S3 buckets to identify those that permit public write access. This process helps ensure data security by flagging buckets that might be vulnerable to unauthorized modifications.

      import boto3 from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError import json creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def is_write_public(bucket_policy): """ Determines if the bucket policy allows public write access. """ try: policy_document = json.loads(bucket_policy['Policy']) except json.JSONDecodeError: print("Error parsing the bucket policy JSON.") return False for statement in policy_document.get('Statement', []): actions = statement.get('Action', []) actions = [actions] if isinstance(actions, str) else actions principals = statement.get('Principal', {}) # Checking if the principal is set to '*' (public access) is_public_principal = principals == '*' or principals.get('AWS') == '*' # Checking for 's3:Put*' or 's3:*' actions public_write_actions = any(action in ['s3:Put*', 's3:*'] or action.startswith('s3:Put') for action in actions) if is_public_principal and public_write_actions: return True return False def is_acl_public_write(bucket_acl): """ Determines if the bucket ACL allows public write access. """ for grant in bucket_acl['Grants']: if grant['Grantee'].get('Type') == 'Group' and grant['Grantee'].get('URI') == 'http://acs.amazonaws.com/groups/global/AllUsers': if 'WRITE' in grant['Permission']: return True return False def check_s3_buckets_public_write(): """ Checks all S3 buckets in the account to ensure they do not allow public write access. """ try: s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) buckets = s3.list_buckets().get('Buckets', []) if not buckets: print("No S3 buckets found in the account.") return for bucket in buckets: bucket_name = bucket['Name'] is_compliant = True # Check block public access settings try: public_access_block = s3.get_public_access_block(Bucket=bucket_name) if public_access_block['PublicAccessBlockConfiguration'].get('BlockPublicAcls', False) is False: print(f"Bucket '{bucket_name}' is non-compliant: Public Access Block allows public write.") is_compliant = False except ClientError as e: if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration': raise # Check the bucket policy try: bucket_policy = s3.get_bucket_policy(Bucket=bucket_name) if is_write_public(bucket_policy): print(f"Bucket '{bucket_name}' is non-compliant: Policy allows public write access.") is_compliant = False except ClientError as e: if e.response['Error']['Code'] != 'NoSuchBucketPolicy': raise # Check bucket ACL try: bucket_acl = s3.get_bucket_acl(Bucket=bucket_name) if is_acl_public_write(bucket_acl): print(f"Bucket '{bucket_name}' is non-compliant: ACL allows public write access.") is_compliant = False except ClientError: raise if is_compliant: print(f"Bucket '{bucket_name}' is compliant: No public write access detected.") print("Public write access check complete for all S3 buckets.") except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") except BotoCoreError as e: print(f"An error occurred accessing AWS S3 service: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # Example usage check_s3_buckets_public_write() context.skip_sub_tasks=True
      copied
      1.2
      1. 1.2.1

        This task programmatically tightens security on a specified AWS S3 bucket by disabling public write access. It modifies the bucket's Block Public Access settings, ensuring compliance with data security standards. This preventive measure is critical in safeguarding sensitive data from unauthorized modifications.

        import boto3 from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def disable_public_write_access(bucket_name): """ Disables public write access for a specified S3 bucket by updating Block Public Access settings and ACL. """ s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Update Block Public Access settings to block public ACLs try: s3.put_public_access_block( Bucket=bucket_name, PublicAccessBlockConfiguration={ 'BlockPublicAcls': True, 'IgnorePublicAcls': True, 'BlockPublicPolicy': True, 'RestrictPublicBuckets': True } ) print(f"Updated Block Public Access settings for '{bucket_name}'.") except ClientError as e: print(f"Failed to update Block Public Access settings for '{bucket_name}': {e}") raise try: if bucket_name: #bucket_name = 'your-bucket-name' disable_public_write_access(bucket_name) else: print("Please provide a bucket name to restrict public access") except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") except BotoCoreError as e: print(f"An error occurred accessing AWS S3 service: {e}") except Exception as e: print(f"An unexpected error occurred: {e}")
        copied
        1.2.1
  2. 2

    This runbook involves a thorough review of S3 bucket configurations to ensure they align with SOC2 standards by prohibiting public read access. It includes checking Block Public Access settings, analyzing bucket policies, and inspecting ACLs to prevent unauthorized data exposure. Essential for maintaining data integrity and confidentiality.

    2
    1. 2.1

      This task involves retrieving and listing the names of all the S3 buckets that are currently associated with your AWS account. By fetching this list, you gain an overview of the existing S3 buckets under your account, which can aid in resource management, access control, and tracking. This information is valuable for maintaining an organized and well-structured AWS environment, ensuring efficient storage utilization, and facilitating easy navigation of your stored data.

      import json cmd = "aws s3api list-buckets" output = _exe(None, cmd,cred_label=cred_label) #Parse the JSON response response_data = json.loads(output) #Extract bucket names bucket_names = [bucket["Name"] for bucket in response_data["Buckets"]] #Print the extracted bucket names: for bucket_name in bucket_names: print(bucket_name)
      copied
      2.1
    2. 2.2

      The task involves scanning AWS S3 buckets to detect any that permit public read access, highlighting potential vulnerabilities in data privacy and security.

      import boto3 from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError import json creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def is_read_public(bucket_policy): """ Determines if the bucket policy allows public read access. """ try: policy_document = json.loads(bucket_policy['Policy']) except json.JSONDecodeError: print("Error parsing the bucket policy JSON.") return False for statement in policy_document.get('Statement', []): actions = statement.get('Action', []) actions = [actions] if isinstance(actions, str) else actions principals = statement.get('Principal', {}) # Checking if the principal is set to '*' (public access) is_public_principal = principals == '*' or principals.get('AWS') == '*' # Checking for 's3:Get*' or 's3:*' actions public_read_actions = any(action in ['s3:Get*', 's3:*'] or action.startswith('s3:Get') for action in actions) if is_public_principal and public_read_actions: return True return False def is_acl_public_read(bucket_acl): """ Determines if the bucket ACL allows public read access. """ for grant in bucket_acl['Grants']: if grant['Grantee'].get('Type') == 'Group' and grant['Grantee'].get('URI') == 'http://acs.amazonaws.com/groups/global/AllUsers': if 'READ' in grant['Permission']: return True return False def check_s3_buckets_public_read(): """ Checks all S3 buckets in the account to ensure they do not allow public read access. """ try: s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) buckets = s3.list_buckets().get('Buckets', []) if not buckets: print("No S3 buckets found in the account.") return for bucket in buckets: bucket_name = bucket['Name'] is_compliant = True # Check block public access settings try: public_access_block = s3.get_public_access_block(Bucket=bucket_name) if not public_access_block['PublicAccessBlockConfiguration'].get('BlockPublicAcls', False): print(f"Bucket '{bucket_name}' is non-compliant: Public Access Block allows public read.") is_compliant = False except ClientError as e: if e.response['Error']['Code'] != 'NoSuchPublicAccessBlockConfiguration': raise # Check the bucket policy try: bucket_policy = s3.get_bucket_policy(Bucket=bucket_name) if is_read_public(bucket_policy): print(f"Bucket '{bucket_name}' is non-compliant: Policy allows public read access.") is_compliant = False except ClientError as e: if e.response['Error']['Code'] != 'NoSuchBucketPolicy': raise # Check bucket ACL try: bucket_acl = s3.get_bucket_acl(Bucket=bucket_name) if is_acl_public_read(bucket_acl): print(f"Bucket '{bucket_name}' is non-compliant: ACL allows public read access.") is_compliant = False except ClientError: raise if is_compliant: print(f"Bucket '{bucket_name}' is compliant: No public read access detected.") print("Public read access check complete for all S3 buckets.") except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") except BotoCoreError as e: print(f"An error occurred accessing AWS S3 service: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # Example usage check_s3_buckets_public_read() context.skip_sub_tasks=True
      copied
      2.2
      1. 2.2.1

        This task strengthens data security by restricting public read access to specified AWS S3 buckets. It updates Block Public Access settings and ACLs, ensuring data confidentiality. This action aligns with security compliance standards to protect sensitive information.

        import boto3 from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def disable_public_write_access(bucket_name): """ Disables public write access for a specified S3 bucket by updating Block Public Access settings and ACL. """ s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Update Block Public Access settings to block public ACLs try: s3.put_public_access_block( Bucket=bucket_name, PublicAccessBlockConfiguration={ 'BlockPublicAcls': True, 'IgnorePublicAcls': True, 'BlockPublicPolicy': True, 'RestrictPublicBuckets': True } ) print(f"Updated Block Public Access settings for '{bucket_name}'.") except ClientError as e: print(f"Failed to update Block Public Access settings for '{bucket_name}': {e}") raise try: if bucket_name: #bucket_name = 'your-bucket-name' disable_public_write_access(bucket_name) else: print("Please provide a bucket name to restrict public access") except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") except BotoCoreError as e: print(f"An error occurred accessing AWS S3 service: {e}") except Exception as e: print(f"An unexpected error occurred: {e}")
        copied
        2.2.1
  3. 3

    This runbook methodically assesses and verifies server-side encryption configurations, identifying buckets that do not adhere to AES-256 or AWS KMS encryption standards. It aims to ensure all S3 buckets within an AWS environment meet stringent SOC2 encryption requirements, enhancing data security and compliance.

    3
    1. 3.1

      This task involves retrieving and listing the names of all the S3 buckets that are currently associated with your AWS account. By fetching this list, you gain an overview of the existing S3 buckets under your account, which can aid in resource management, access control, and tracking. This information is valuable for maintaining an organized and well-structured AWS environment, ensuring efficient storage utilization, and facilitating easy navigation of your stored data.

      import json cmd = "aws s3api list-buckets" output = _exe(None, cmd,cred_label=cred_label) #Parse the JSON response response_data = json.loads(output) #Extract bucket names bucket_names = [bucket["Name"] for bucket in response_data["Buckets"]] #Print the extracted bucket names: for bucket_name in bucket_names: print(bucket_name)
      copied
      3.1
    2. 3.2

      This task assesses whether AWS S3 buckets have default server-side encryption activated or if their bucket policies explicitly deny any put-object requests that lack server-side encryption, specifically using AES-256 or AWS KMS. It designates S3 buckets as NON_COMPLIANT if they are not set to be encrypted by default.

      # Compliance Rule: s3-bucket-server-side-encryption-enabled # This rule checks each S3 bucket for two key criteria: # 1. Default Encryption: The bucket must have server-side encryption enabled by default, # using either AES-256 or AWS KMS. # 2. Policy Compliance: The bucket policy must explicitly deny put-object requests that # are not accompanied by server-side encryption using AES-256 or AWS KMS. # A bucket is considered NON_COMPLIANT if it does not have default encryption enabled. import boto3 from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError import json creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def is_encryption_enabled(bucket_name): """ Check if the specified S3 bucket has server-side encryption enabled. """ s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Attempt to retrieve the bucket encryption configuration encryption = s3.get_bucket_encryption(Bucket=bucket_name) #print(encryption) # for debugging # If this call is successful, encryption is enabled return True except ClientError as e: if e.response['Error']['Code'] == 'ServerSideEncryptionConfigurationNotFoundError': # Encryption is not enabled return False else: # Other errors raise def is_policy_compliant(bucket_name): """ Check if the bucket policy explicitly denies put-object requests without server-side encryption. """ s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: policy = s3.get_bucket_policy(Bucket=bucket_name) policy_document = json.loads(policy['Policy']) #print(f"Policy for bucket '{bucket_name}': {policy_document}") # Debug statement for statement in policy_document.get('Statement', []): if statement.get('Effect') == 'Deny': actions = statement.get('Action', []) if isinstance(actions, str): actions = [actions] if any(action.startswith('s3:Put') for action in actions): conditions = statement.get('Condition', {}).get('StringEquals', {}) encryption_condition = conditions.get('s3:x-amz-server-side-encryption', None) if encryption_condition in ['AES256', 'aws:kms']: #print(f"Bucket '{bucket_name}' has a compliant policy with encryption requirement.") # Debug statement return True #print(f"Bucket '{bucket_name}' does not have a compliant policy with encryption requirement.") # Debug statement return False except ClientError as e: if e.response['Error']['Code'] == 'NoSuchBucketPolicy': #print(f"No policy for bucket '{bucket_name}'.") # Debug statement return False else: raise def check_all_buckets_for_encryption(): """ Check all S3 buckets in the account for server-side encryption and compliance with bucket policy. """ try: s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) buckets = s3.list_buckets().get('Buckets', []) if not buckets: print("No S3 buckets found in the account.") return for bucket in buckets: bucket_name = bucket['Name'] encrypted = is_encryption_enabled(bucket_name) policy_compliant = is_policy_compliant(bucket_name) # A bucket is considered NON_COMPLIANT if it does not have default encryption enabled. if encrypted: if policy_compliant: print(f"Bucket '{bucket_name}' is COMPLIANT with server-side encryption and has a compliant policy.") else: print(f"Bucket '{bucket_name}' is COMPLIANT with server-side encryption but does not have a compliant policy.") else: print(f"Bucket '{bucket_name}' is NON_COMPLIANT with server-side encryption.") except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") except BotoCoreError as e: print(f"An error occurred accessing AWS S3 service: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") check_all_buckets_for_encryption() context.skip_sub_tasks=True
      copied
      3.2
      1. 3.2.1

        This task involves enabling AES-256 server-side encryption on S3 buckets and verifying its activation. This process ensures data security by encrypting contents within the buckets. By default all new buckets created are encrypted but this task beneficial for legacy buckets without encryption enabled.

        import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def enable_and_verify_bucket_encryption(bucket_name): """ Enable default AES-256 server-side encryption on the specified S3 bucket and verify the encryption status. """ s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) encryption_configuration = {'Rules': [{'ApplyServerSideEncryptionByDefault': {'SSEAlgorithm': 'AES256'}}]} try: s3.put_bucket_encryption(Bucket=bucket_name, ServerSideEncryptionConfiguration=encryption_configuration) response = s3.get_bucket_encryption(Bucket=bucket_name) if response['ResponseMetadata']['HTTPStatusCode'] == 200: print(f"Encryption successfully enabled on bucket '{bucket_name}'.") else: print(f"Failed to verify encryption on bucket '{bucket_name}'.") except ClientError as e: print(f"AWS ClientError: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCoreError: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") #bucket_name = 'test-sse-encryption-bucket-123' enable_and_verify_bucket_encryption(bucket_name)
        copied
        3.2.1
  4. 4

    This runbook automates the assessment and activation of Server Access Logging for Amazon S3 buckets. It aligns with SOC2 compliance guidelines by ensuring that every S3 bucket has logging enabled, contributing to better security and traceability of actions performed on the buckets.

    4
    1. 4.1

      This task involves retrieving and listing the names of all the S3 buckets that are currently associated with your AWS account. By fetching this list, you gain an overview of the existing S3 buckets under your account, which can aid in resource management, access control, and tracking. This information is valuable for maintaining an organized and well-structured AWS environment, ensuring efficient storage utilization, and facilitating easy navigation of your stored data.

      import json cmd = "aws s3api list-buckets" output = _exe(None, cmd,cred_label=cred_label) #Parse the JSON response response_data = json.loads(output) #Extract bucket names bucket_names = [bucket["Name"] for bucket in response_data["Buckets"]] #Print the extracted bucket names: for bucket_name in bucket_names: print(bucket_name)
      copied
      4.1
    2. 4.2

      This task involves checking AWS S3 buckets to determine if Server Access Logging is enabled. It's crucial for monitoring and diagnosing security incidents, as it records requests made to the S3 bucket, enhancing transparency and aiding compliance with security protocols.

      # SOC2 Compliance Guideline: S3 Bucket Logging import boto3 from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def is_logging_enabled(bucket_name): """ Check if logging is enabled for the specified S3 bucket. """ s3 = boto3.client('s3', aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Attempt to retrieve the bucket logging configuration logging_config = s3.get_bucket_logging(Bucket=bucket_name) # Logging is enabled if 'LoggingEnabled' key is present in the response return 'LoggingEnabled' in logging_config except ClientError as e: print(f"Error checking logging for bucket '{bucket_name}': {e}") raise def check_all_buckets_for_logging(): """ Check all S3 buckets in the account to ensure logging is enabled. """ try: s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) buckets = s3.list_buckets().get('Buckets', []) if not buckets: print("No S3 buckets found in the account.") return for bucket in buckets: bucket_name = bucket['Name'] if is_logging_enabled(bucket_name): print(f"Bucket '{bucket_name}' is COMPLIANT with logging enabled.") else: print(f"Bucket '{bucket_name}' is NON_COMPLIANT with logging disabled.") except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") except BotoCoreError as e: print(f"An error occurred accessing AWS S3 service: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") check_all_buckets_for_logging() context.skip_sub_tasks=True
      copied
      4.2
      1. 4.2.1

        This task involves setting up and verifying Server Access Logging for AWS S3 buckets. It ensures that logging is active for a bucket, providing detailed records of access requests. This is crucial for security monitoring, compliance with data governance standards, and effective management of AWS resources.

        import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def enable_and_verify_logging(bucket_name, log_bucket, log_prefix): """ Enable logging for an S3 bucket and verify that it's been enabled, with additional checks. """ s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Check if required parameters are provided if not bucket_name or not log_bucket or not log_prefix: print("Error: Bucket name, logging bucket, or log prefix is missing.") return try: # Enable logging s3.put_bucket_logging( Bucket=bucket_name, BucketLoggingStatus={ 'LoggingEnabled': { 'TargetBucket': log_bucket, 'TargetPrefix': log_prefix } } ) print(f"Logging enabled for bucket '{bucket_name}'.") # Verify logging response = s3.get_bucket_logging(Bucket=bucket_name) if 'LoggingEnabled' in response: print("Logging Status: Enabled") print(f"HTTP Status Code: {response['ResponseMetadata']['HTTPStatusCode']}") print(f"Target Bucket: {response['LoggingEnabled']['TargetBucket']}") print(f"Target Prefix: {response['LoggingEnabled']['TargetPrefix']}") else: print("Logging is not enabled.") except ClientError as e: print(f"AWS ClientError: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCoreError: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") #bucket_name = 'encryption-test-bucket-789' #log_bucket = 'encryption-test-bucket-789' # It can be the same as bucket_name but not recommended #log_prefix = 'log-prefix/whatever' enable_and_verify_logging(bucket_name, log_bucket, log_prefix)
        copied
        4.2.1
  5. 5

    This runbook involves auditing the AWS account to check if the root user has any active access keys. It's essential to ensure root access keys are not used, as they provide unrestricted access to all resources in the AWS account. The audit aims to enhance security by verifying that no root access keys exist, aligning with best practices for AWS account management.

    5
    1. 5.1

      This task involves verifying the presence of access keys for the AWS root user. It is critical for security to ensure that the root user, which has extensive privileges, does not have active access keys, thereby reducing the risk of unauthorized access and potential security breaches in the AWS environment.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Create a boto3 client for IAM iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Retrieve the AWS account's details account_summary = iam_client.get_account_summary() # Check if any access keys exist for the root user root_access_keys = account_summary['SummaryMap'].get('AccountAccessKeysPresent', 0) if root_access_keys == 0: print("Compliant: No access keys found for the root user.") else: print("Non-compliant: Access keys found for the root user.") except ClientError as e: print(f"AWS client error occurred: {e}") except BotoCoreError as e: print(f"Boto core error occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}")
      copied
      5.1
  6. 6

    This runbook reviews and ensures AWS IAM policies don't contain overly permissive statements granting full admin access, adhering to the principle of least privilege for enhanced security.

    6
    1. 6.1

      This lists all IAM users in an AWS account, providing key details like usernames, user IDs, and creation dates. Essential for managing permissions and auditing access, this function supports security and compliance protocols by offering a clear view of user entities and their access levels. It's instrumental in enforcing security policies and the principle of least privilege in AWS resource access management.

      import boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Create a paginator for the list_users operation paginator = iam_client.get_paginator('list_users') # Use the paginator to paginate through the users table = context.newtable() table.title = "User list" table.num_cols = 3 table.num_rows = 1 table.has_header_row = True rownum = 0 table.setval(rownum, 0, "User name") table.setval(rownum, 1, "User ID") table.setval(rownum, 2, "Created on") for page in paginator.paginate(): users = page['Users'] table.num_rows += len(page['Users']) # Output user details if users: # print("List of IAM Users:") for user in users: rownum += 1 # print(f"Username: {user['UserName']}, User ID: {user['UserId']}, Created On: {user['CreateDate']}") table.setval(rownum, 0, user['UserName']) table.setval(rownum, 1, user['UserId']) table.setval(rownum, 2, user['CreateDate']) else: print("No IAM users found in this page.") # Handle specific exceptions except botocore.exceptions.NoCredentialsError: print("Credentials not available") except botocore.exceptions.PartialCredentialsError: print("Incomplete credentials provided") except botocore.exceptions.SSLError: print("SSL connection could not be established. Ensure your network allows SSL connections to AWS services") except botocore.exceptions.EndpointConnectionError: print("Unable to connect to the endpoint. Check your AWS configuration and network settings") except botocore.exceptions.ClientError as e: print(f"Unexpected error occurred accessing AWS: {e}") # Handle general exceptions except Exception as e: print(f"An unhandled error occurred: {str(e)}")
      copied
      6.1
    2. 6.2

      This task audits AWS IAM users to identify those with administrative access. It ensures adherence to security standards by limiting broad access rights, crucial for mitigating risks associated with unauthorized permissions in a cloud environment.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def is_admin_policy(policy_document, exclude_permission_boundary): """ Check if the policy document contains admin access statements. """ for statement in policy_document.get('Statement', []): if statement.get('Effect') == 'Allow' and \ statement.get('Action') == '*' and \ statement.get('Resource') == '*': if not exclude_permission_boundary or \ (exclude_permission_boundary and 'Condition' not in statement): return True return False def evaluate_group_policies(user_name, iam, exclude_permission_boundary): """ Evaluate policies attached to groups for a given user. """ reasons = [] try: groups = iam.list_groups_for_user(UserName=user_name)['Groups'] for group in groups: attached_policies = iam.list_attached_group_policies(GroupName=group['GroupName'])['AttachedPolicies'] #print(attached_policies) # for debugging for policy in attached_policies: policy_details = iam.get_policy(PolicyArn=policy['PolicyArn']) if 'DefaultVersionId' in policy_details['Policy']: policy_version = iam.get_policy_version( PolicyArn=policy['PolicyArn'], VersionId=policy_details['Policy']['DefaultVersionId'] ) if is_admin_policy(policy_version['PolicyVersion']['Document'], exclude_permission_boundary): reasons.append(f"Group Attached Policy: {policy['PolicyArn']} ({group['GroupName']})") except ClientError as e: print(f"Error retrieving group policies for user {user_name}: {e}") return reasons def evaluate_attached_policies(user_name, iam, exclude_permission_boundary): """ Evaluate attached managed policies for a given user. """ reasons = [] try: attached_policies = iam.list_attached_user_policies(UserName=user_name)['AttachedPolicies'] #print(attached_policies) # for debugging for policy in attached_policies: policy_details = iam.get_policy(PolicyArn=policy['PolicyArn']) if 'DefaultVersionId' in policy_details['Policy']: policy_version = iam.get_policy_version( PolicyArn=policy['PolicyArn'], VersionId=policy_details['Policy']['DefaultVersionId'] ) if is_admin_policy(policy_version['PolicyVersion']['Document'], exclude_permission_boundary): reasons.append(f"Attached Policy: {policy['PolicyArn']}") except ClientError as e: print(f"Error retrieving attached policies for user {user_name}: {e}") return reasons def evaluate_inline_policies(user_name, iam, exclude_permission_boundary): """ Evaluate inline policies for a given user. """ reasons = [] try: inline_policies = iam.list_user_policies(UserName=user_name)['PolicyNames'] #print(inline_policies) # for debugging for policy_name in inline_policies: policy_document = iam.get_user_policy( UserName=user_name, PolicyName=policy_name )['PolicyDocument'] if is_admin_policy(policy_document, exclude_permission_boundary): reasons.append(f"Inline Policy: {policy_name}") except ClientError as e: print(f"Error retrieving inline policies for user {user_name}: {e}") return reasons def evaluate_iam_users_and_policies(exclude_permission_boundary=False): """ Evaluates IAM users for admin access in attached, inline, and group policies. """ iam = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) compliance_report = { 'compliant': [], 'non_compliant': {} } try: users = iam.list_users()['Users'] for user in users: user_name = user['UserName'] print(f"Evaluating user: {user_name}") reasons = evaluate_attached_policies(user_name, iam, exclude_permission_boundary) + \ evaluate_inline_policies(user_name, iam, exclude_permission_boundary) + \ evaluate_group_policies(user_name, iam, exclude_permission_boundary) if reasons: compliance_report['non_compliant'][user_name] = reasons else: compliance_report['compliant'].append(user_name) except ClientError as e: print(f"ClientError while listing IAM users: {e}") except BotoCoreError as e: print(f"BotoCoreError encountered: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # Print Compliance Report if compliance_report['non_compliant']: print("\nNon-Compliant IAM Users (Admin Access Found):") for user, reasons in compliance_report['non_compliant'].items(): print(f"{user} - Reasons: {', '.join(reasons)}") else: print("\nNo Non-Compliant IAM Users Found.") if compliance_report['compliant']: print("\nCompliant IAM Users (No Admin Access):") for user in compliance_report['compliant']: print(user) else: print("\nAll IAM Users are Non-Compliant.") exclude_permission_boundary = False evaluate_iam_users_and_policies(exclude_permission_boundary) context.skip_sub_tasks=True
      copied
      6.2
      1. 6.2.1

        This task is used to detach managed IAM policies or delete inline policies from specific IAM users. This action is crucial for maintaining secure and appropriate access levels within AWS environments, ensuring compliance with best security practices.

        import boto3 from botocore.exceptions import ClientError, NoCredentialsError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def remove_or_modify_policy(iam_client, user_name, policy_arn=None, inline_policy_name=None): """ Detach a managed IAM policy or delete an inline IAM policy from a specified AWS IAM user. Args: iam_client: An initialized Boto3 IAM client. user_name: The name of the IAM user. policy_arn: The ARN of the managed IAM policy to be detached. inline_policy_name: The name of the inline IAM policy to be deleted. The function checks if the user exists and whether the specified policies are attached or exist, then proceeds with the appropriate action. """ try: # Check if the user exists iam_client.get_user(UserName=user_name) if policy_arn: # Detach managed policy if it is attached attached_policies = iam_client.list_attached_user_policies(UserName=user_name)['AttachedPolicies'] if any(policy['PolicyArn'] == policy_arn for policy in attached_policies): iam_client.detach_user_policy(UserName=user_name, PolicyArn=policy_arn) print(f"Detached policy {policy_arn} from {user_name}") else: print(f"Policy {policy_arn} is not attached to {user_name}") elif inline_policy_name: # Delete inline policy if it exists inline_policies = iam_client.list_user_policies(UserName=user_name)['PolicyNames'] if inline_policy_name in inline_policies: iam_client.delete_user_policy(UserName=user_name, PolicyName=inline_policy_name) print(f"Deleted inline policy {inline_policy_name} from {user_name}") else: print(f"Inline policy {inline_policy_name} does not exist for {user_name}") except ClientError as e: print(f"An AWS ClientError occurred: {e}") except NoCredentialsError: print("No AWS credentials available. Please configure them.") except BotoCoreError as e: print(f"A BotoCoreError occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # user_name = 'test_user' # policy_arn_to_remove = 'arn:aws:iam::aws:policy/AdministratorAccess' # Example ARN # inline_policy_name = 'your-inline-policy-name' remove_or_modify_policy(iam_client, user_name, policy_arn=policy_arn_to_remove)
        copied
        6.2.1
  7. 7

    This runbook helps with a key task for SOC2 compliance, requiring Multi-Factor Authentication (MFA) for all cloud service users in AWS. This step strengthens security and access control, meeting SOC2's requirements for safeguarding data and maintaining robust account security in cloud environments.

    7
    1. 7.1

      This lists all IAM users in an AWS account, providing key details like usernames, user IDs, and creation dates. Essential for managing permissions and auditing access, this function supports security and compliance protocols by offering a clear view of user entities and their access levels. It's instrumental in enforcing security policies and the principle of least privilege in AWS resource access management.

      import boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Create a paginator for the list_users operation paginator = iam_client.get_paginator('list_users') # Use the paginator to paginate through the users table = context.newtable() table.title = "User list" table.num_cols = 3 table.num_rows = 1 table.has_header_row = True rownum = 0 table.setval(rownum, 0, "User name") table.setval(rownum, 1, "User ID") table.setval(rownum, 2, "Created on") for page in paginator.paginate(): users = page['Users'] table.num_rows += len(page['Users']) # Output user details if users: # print("List of IAM Users:") for user in users: rownum += 1 # print(f"Username: {user['UserName']}, User ID: {user['UserId']}, Created On: {user['CreateDate']}") table.setval(rownum, 0, user['UserName']) table.setval(rownum, 1, user['UserId']) table.setval(rownum, 2, user['CreateDate']) else: print("No IAM users found in this page.") # Handle specific exceptions except botocore.exceptions.NoCredentialsError: print("Credentials not available") except botocore.exceptions.PartialCredentialsError: print("Incomplete credentials provided") except botocore.exceptions.SSLError: print("SSL connection could not be established. Ensure your network allows SSL connections to AWS services") except botocore.exceptions.EndpointConnectionError: print("Unable to connect to the endpoint. Check your AWS configuration and network settings") except botocore.exceptions.ClientError as e: print(f"Unexpected error occurred accessing AWS: {e}") # Handle general exceptions except Exception as e: print(f"An unhandled error occurred: {str(e)}")
      copied
      7.1
    2. 7.2

      This task aims at enhancing SOC2 compliance by identifying AWS IAM users without Multi-Factor Authentication (MFA). This process helps enforce strict access control measures, crucial for SOC2 guidelines, by pinpointing and addressing accounts lacking essential security features.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_mfa_enabled_for_users(iam_client): """ Checks if MFA is enabled for all IAM users with console access. :param iam_client: An IAM client from boto3 :return: Tuple containing counts of compliant, non-compliant, and not-applicable users """ compliant_count = 0 non_compliant_count = 0 not_applicable_count = 0 try: # Paginator to handle the case if there are more users than what a single call can return paginator = iam_client.get_paginator('list_users') table = context.newtable() table.title = "MFA compliance summary" table.num_cols = 3 table.num_rows = 1 table.has_header_row = True rownum = 0 table.setval(rownum,0, "User") table.setval(rownum,1, "Compliant") table.setval(rownum,2, "Console Access") for page in paginator.paginate(): table.num_rows += len(page['Users']) for user in page['Users']: rownum += 1 username = user['UserName'] table.setval(rownum,0, username) # Check if the user has console access by looking for login profile try: iam_client.get_login_profile(UserName=username) console_access = True table.setval(rownum,2, "YES") except iam_client.exceptions.NoSuchEntityException: console_access = False table.setval(rownum,2, "NO") not_applicable_count += 1 # User without console access # Check if the user has MFA enabled if console_access: mfa_devices = iam_client.list_mfa_devices(UserName=username)['MFADevices'] mfa_enabled = len(mfa_devices) > 0 # Update compliance status if not mfa_enabled: #print(f"User: {username} is NON-COMPLIANT - Console Access: Yes, MFA Enabled: No") non_compliant_count += 1 table.setval(rownum,1, "NO") else: #print(f"User: {username} is COMPLIANT - Console Access: Yes, MFA Enabled: Yes") compliant_count += 1 table.setval(rownum,1, "YES") except ClientError as e: print(f"AWS client error occurred: {e}") except BotoCoreError as e: print(f"Boto core error occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return compliant_count, non_compliant_count, not_applicable_count # Create a boto3 client for IAM iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Check if MFA is enabled for all IAM users with console access compliant, non_compliant, not_applicable = check_mfa_enabled_for_users(iam_client) # Print compliance summary print("\nCompliance Summary:") print(f"Total Compliant Users (MFA Enabled): {compliant}") print(f"Total Non-Compliant Users (MFA Disabled): {non_compliant}") print(f"Total Users for Whom MFA Check is Not Applicable (No Console Access): {not_applicable}") # Print final verdict if non_compliant == 0: print("\nFinal Verdict: COMPLIANT - All users with console access have MFA enabled.") else: print("\nFinal Verdict: NON-COMPLIANT - There are users with console access who do not have MFA enabled.")
      copied
      7.2
  8. 8

    This runbook ensures that AWS IAM users don't have direct policies attached, adhering to SOC2 security guidelines. It mandates permissions be granted via group memberships or IAM roles, enhancing security and simplifying permission management. This audit is key in maintaining structured access control, crucial for SOC2 compliance in cloud environments.

    8
    1. 8.1

      This task involves identifying IAM users in an AWS environment who have individual policies attached to their accounts. This is key for security and compliance, ensuring permissions are managed through group memberships or role assumptions, rather than direct attachments, for better access control and security practices.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_iam_users_for_attached_policies(): """ Check if any IAM users have policies attached directly to them. """ iam = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) non_compliant_users = 0 total_users = 0 try: # Initialize pagination paginator = iam.get_paginator('list_users') for page in paginator.paginate(): for user in page['Users']: total_users += 1 username = user['UserName'] # Check for attached user policies attached_policies = iam.list_attached_user_policies(UserName=username)['AttachedPolicies'] if attached_policies: print(f"Non-compliant: User '{username}' has direct policies attached.") non_compliant_users += 1 else: print(f"Compliant: User '{username}' has no direct policies attached.") print(f"\nTotal users checked: {total_users}") print(f"Non-compliant users: {non_compliant_users}") except ClientError as e: print(f"AWS ClientError: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"BotoCoreError: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # Example usage check_iam_users_for_attached_policies() context.skip_sub_tasks=True
      copied
      8.1
      1. 8.1.1

        This task manages user permissions in AWS by confirming the existence of both IAM users and groups, ensuring users aren't already in the target group, and then adding them if necessary. This process streamlines user access management and maintains organized, best-practice-based user-group associations in AWS IAM.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def add_user_to_group(user_name, group_name): """ Adds an IAM user to an IAM group, after checking if both exist and if the user is not already in the group. :param user_name: The name of the IAM user. :param group_name: The name of the IAM group. """ # Create an IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Check if the user exists iam_client.get_user(UserName=user_name) except ClientError as error: if error.response['Error']['Code'] == 'NoSuchEntity': print(f"The user '{user_name}' does not exist.") return else: raise try: # Check if the group exists and if the user is already a member response = iam_client.get_group(GroupName=group_name) if any(user['UserName'] == user_name for user in response['Users']): print(f"User '{user_name}' is already a member of the group '{group_name}'.") return except ClientError as error: if error.response['Error']['Code'] == 'NoSuchEntity': print(f"The group '{group_name}' does not exist.") return else: raise try: # Add user to the group iam_client.add_user_to_group( GroupName=group_name, UserName=user_name ) print(f"User '{user_name}' has been successfully added to the group '{group_name}'.") except ClientError as error: # Handle other possible errors print(f"Unexpected error: {error}") except Exception as e: # Handle any other exception print(f"An error occurred: {e}") # Example usage #username = 'test_user' #groupname = 'your-group-name' add_user_to_group(username, groupname) context.proceed=False
        copied
        8.1.1
      2. 8.1.2

        This task assigns a policy to an IAM user, enabling them to assume a specified IAM role. This key security measure allows controlled, temporary access elevation in line with the principle of least privilege. It's essential for secure and efficient permission management in AWS. Note:- This will directly attach a policy to the AWS IAM User.

        import boto3 from botocore.exceptions import ClientError import json creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_aws_account_id(): """ Retrieves the AWS account ID using STS. """ sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') try: account_id = sts_client.get_caller_identity()["Account"] return account_id except ClientError as error: print(f"Error retrieving AWS account ID: {error}") return None def get_policy_arn(iam_client, policy_name): """ Retrieves the ARN of a given policy. :param iam_client: The IAM client instance. :param policy_name: The name of the policy. :return: The ARN of the policy or None if not found. """ try: policy = iam_client.get_policy(PolicyArn=f"arn:aws:iam::{get_aws_account_id()}:policy/{policy_name}") return policy['Policy']['Arn'] except ClientError: return None def check_role_exists(iam_client, role_name): """ Checks if the specified IAM role exists. :param iam_client: The IAM client instance. :param role_name: The name of the IAM role to check. :return: True if the role exists, False otherwise. """ try: iam_client.get_role(RoleName=role_name) return True except ClientError as error: if error.response['Error']['Code'] == 'NoSuchEntity': return False else: raise def attach_role_to_user(user_name, role_name): """ Attaches a policy to a user that allows the user to assume a specified role. :param user_name: The name of the IAM user. :param role_name: The name of the IAM role. """ # Create an IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Get AWS account ID account_id = get_aws_account_id() if account_id is None: print("Failed to retrieve AWS account ID. Exiting function.") return policy_name = f"AllowAssumeRole-{role_name}" policy_arn = get_policy_arn(iam_client, policy_name) policy_document = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "sts:AssumeRole", "Resource": f"arn:aws:iam::{account_id}:role/{role_name}" } ] } # Check if the role exists if not check_role_exists(iam_client, role_name): print(f"The role '{role_name}' does not exist. Exiting function.") return # Create or update policy if policy_arn: print(f"Policy {policy_name} already exists. Updating policy.") try: iam_client.create_policy_version( PolicyArn=policy_arn, PolicyDocument=json.dumps(policy_document), SetAsDefault=True ) except ClientError as error: print(f"Failed to update policy: {error}") return else: try: policy_response = iam_client.create_policy( PolicyName=policy_name, PolicyDocument=json.dumps(policy_document) ) policy_arn = policy_response['Policy']['Arn'] except ClientError as error: print(f"Failed to create policy: {error}") return # Attach the policy to the user try: iam_client.attach_user_policy( UserName=user_name, PolicyArn=policy_arn ) print(f"Policy {policy_name} attached to user {user_name} allowing to assume role {role_name}.") except ClientError as error: print(f"Failed to attach policy to user: {error}") # Example usage #username = 'test_user' #rolename = 'AWSServiceRoleForECS' attach_role_to_user(username, rolename) context.proceed=False
        copied
        8.1.2
  9. 9

    This runbook refers to a security best practice where IAM access keys that have been active for an extended period of time are deactivated to prevent potential unauthorized use. This runbook involves monitoring the age of IAM access keys and automatically deactivating any that exceed a specified age threshold (e.g., 90 days). This proactive approach enhances security by reducing the likelihood of compromised keys being used for unauthorized or malicious activity over prolonged periods. Implementing this runbook typically involves using AWS SDK to interact with the IAM API, retrieving all user keys, evaluating their ages, and deactivating those that are deemed to be too old.

    threshold_age=90 # Hardcoded for a single execution result for the whole runbook
    copied
    9
    1. 9.1

      This lists all IAM users in an AWS account, providing key details like usernames, user IDs, and creation dates. Essential for managing permissions and auditing access, this function supports security and compliance protocols by offering a clear view of user entities and their access levels. It's instrumental in enforcing security policies and the principle of least privilege in AWS resource access management.

      import boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Create a paginator for the list_users operation paginator = iam_client.get_paginator('list_users') # Use the paginator to paginate through the users table = context.newtable() table.title = "User list" table.num_cols = 3 table.num_rows = 1 table.has_header_row = True rownum = 0 table.setval(rownum, 0, "User name") table.setval(rownum, 1, "User ID") table.setval(rownum, 2, "Created on") for page in paginator.paginate(): users = page['Users'] table.num_rows += len(page['Users']) # Output user details if users: # print("List of IAM Users:") for user in users: rownum += 1 # print(f"Username: {user['UserName']}, User ID: {user['UserId']}, Created On: {user['CreateDate']}") table.setval(rownum, 0, user['UserName']) table.setval(rownum, 1, user['UserId']) table.setval(rownum, 2, user['CreateDate']) else: print("No IAM users found in this page.") # Handle specific exceptions except botocore.exceptions.NoCredentialsError: print("Credentials not available") except botocore.exceptions.PartialCredentialsError: print("Incomplete credentials provided") except botocore.exceptions.SSLError: print("SSL connection could not be established. Ensure your network allows SSL connections to AWS services") except botocore.exceptions.EndpointConnectionError: print("Unable to connect to the endpoint. Check your AWS configuration and network settings") except botocore.exceptions.ClientError as e: print(f"Unexpected error occurred accessing AWS: {e}") # Handle general exceptions except Exception as e: print(f"An unhandled error occurred: {str(e)}")
      copied
      9.1
    2. 9.2

      This task identifies and isolates AWS IAM (Identity and Access Management) access keys that have surpassed a predefined age threshold. AWS IAM keys are utilized to securely control access to AWS services and resources. As a best practice for secure access management, it is recommended to regularly rotate IAM access keys and retire those that are no longer needed or have become outdated. By filtering out old access keys, administrators can ensure that access credentials are not overly permissive or unnecessarily prolonged, thereby enhancing the security posture. This task involves analyzing the creation date of each IAM access key, comparing it against the current date, and identifying keys that exceed the acceptable age limit, which are then either flagged for review to uphold stringent access control and minimize potential security risks.

      import boto3 from datetime import datetime creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Define the threshold age in days #threshold_age = 55 # List to store old access key data old_keys_data = [] try: # Initialize a flag for old keys detection old_keys_found = False # Check access keys for each user for user in users: username = user['UserName'] access_keys = iam_client.list_access_keys(UserName=username)['AccessKeyMetadata'] # Check age of each access key for key in access_keys: #print(key) # for debugging key_age = (datetime.now(datetime.utcnow().astimezone().tzinfo) - key['CreateDate']).days if key_age > int(threshold_age): print(f"User: {username}, Access Key: {key['AccessKeyId']}, Age: {key_age} days, Status: {key['Status']}") old_keys_found = True # Add old key data to list old_keys_data.append({ 'username': username, 'access_key_id': key['AccessKeyId'] }) # If no old keys are found, print a message if not old_keys_found: print("No access keys older than the defined threshold were found.") else: print("Old key data: ", old_keys_data) # Pass `old_keys_data` to downstream task here except boto3.exceptions.botocore.exceptions.PartialCredentialsError as pce: print(f"Credentials error: {str(pce)}") except boto3.exceptions.botocore.exceptions.BotoCoreError as bce: print(f"BotoCore Error: {str(bce)}") except boto3.exceptions.botocore.exceptions.ClientError as ce: print(f"Client Error: {str(ce)}") except Exception as e: print(f"An unexpected error occurred: {str(e)}") context.skip_sub_tasks=True
      copied
      9.2
      1. 9.2.1

        This task involves deactivating IAM (Identity and Access Management) access keys in AWS that have surpassed a specified age or are no longer in use, as a measure to enhance security. Regularly auditing and deactivating stale or outdated access keys restrict unauthorized or inadvertent access to AWS resources and services. This task deactivates access keys that are identified as old, thereby ensuring they cannot be used to authenticate API requests. This practice is pivotal in a robust IAM policy to assure that only active and necessary access keys are in circulation, thereby safeguarding the AWS environment against potential malicious activities or inadvertent misconfigurations by reducing the attack surface and adhering to the principle of least privilege.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) ''' # Example input data old_keys_data = [ {'username': 'xyz_other_account', 'access_key_id': 'AJHBVFNONLHGBFHAS2CM'}, # ... received from parent task ] ''' try: # Check if old_keys_data is not empty if old_keys_data: # Loop through each key data in the input for key_data in old_keys_data: username = key_data['username'] access_key_id = key_data['access_key_id'] # Deactivate the access key #iam_client.update_access_key(UserName=username, AccessKeyId=access_key_id, Status='Inactive') print(f"Deactivated access key {access_key_id} for user {username}") else: print("No old keys provided for deactivation.") except boto3.exceptions.botocore.exceptions.PartialCredentialsError as pce: print(f"Credentials error: {str(pce)}") except boto3.exceptions.botocore.exceptions.BotoCoreError as bce: print(f"BotoCore Error: {str(bce)}") except boto3.exceptions.botocore.exceptions.ClientError as ce: print(f"Client Error: {str(ce)}") except Exception as e: print(f"An unexpected error occurred: {str(e)}") context.proceed=False
        copied
        9.2.1
      2. 9.2.2

        This task involves generating a new set of credentials – an access key ID and a secret access key – for an AWS Identity and Access Management (IAM) user. These credentials are vital for programmatic access to AWS services, enabling API calls to be authenticated and authorized. Within AWS, an IAM user can have a maximum of two active access keys, facilitating seamless key rotation. The procedure to create an access key includes the automatic creation of an access key ID and a secret key, which should be securely stored immediately upon creation, as AWS does not allow for the retrieval of the secret key at a later time. Implementing good practices, such as routinely rotating and responsibly managing access keys, is crucial to maintaining secure user access to AWS services.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) if old_keys_data: # Iterate over old keys data and create new keys for the respective users for old_key in old_keys_data: try: new_key = iam_client.create_access_key(UserName=old_key['username']) print(f"New key created for {old_key['username']}:") print(new_key) except iam_client.exceptions.LimitExceededException as lee: print(f"Limit Error creating key for {old_key['username']}: {str(lee)}") except iam_client.exceptions.NoSuchEntityException as nee: print(f"No Such Entity Error for {old_key['username']}: {str(nee)}") except iam_client.exceptions.ServiceFailureException as sfe: print(f"Service Failure for {old_key['username']}: {str(sfe)}") except Exception as e: print(f"An unexpected error occurred while creating key for {old_key['username']}: {str(e)}") else: print("No old keys data was passed to this task") context.proceed=False
        copied
        9.2.2
      3. 9.2.3

        This task pertains to managing and refreshing AWS Identity and Access Management (IAM) user credentials to uphold security best practices. IAM access keys, which consist of an access key ID and a secret access key, are used to authenticate AWS API requests. However, if these keys are compromised or simply aged, updating them becomes crucial to safeguard the account. Updating might involve changing the status of the keys (activating or deactivating them), in this case we are deactivating them. The practice of regularly updating access keys is crucial in minimizing the risk associated with long-term key usage or potential unauthorized access.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Check if old_keys_data is not empty if old_keys_data: # Loop through each key data in the input for key_data in old_keys_data: username = key_data['username'] access_key_id = key_data['access_key_id'] # Deactivate the access key iam_client.update_access_key(UserName=username, AccessKeyId=access_key_id, Status='Inactive') print(f"Deactivated access key {access_key_id} for user {username}") else: print("No old keys provided for deactivation.") except boto3.exceptions.botocore.exceptions.PartialCredentialsError as pce: print(f"Credentials error: {str(pce)}") except boto3.exceptions.botocore.exceptions.BotoCoreError as bce: print(f"BotoCore Error: {str(bce)}") except boto3.exceptions.botocore.exceptions.ClientError as ce: print(f"Client Error: {str(ce)}") except Exception as e: print(f"An unexpected error occurred: {str(e)}") context.proceed=False
        copied
        9.2.3
      4. 9.2.4

        This task refers to the removal of an AWS Identity and Access Management (IAM) user's access keys, ensuring they can no longer be used for authentication with AWS services and resources. IAM access keys comprise an access key ID and a secret access key, which are employed to sign programmatic requests that you make to AWS. Whether it is for security compliance, a response to a security incident, or part of a key rotation policy, deleting an IAM access key is a critical operation. After deletion, any applications or users utilizing the deleted access key will lose access to AWS resources, so it is crucial to update all instances where the key is used before deletion. Additionally, AWS recommends regular access key rotation as a best practice, which involves creating a new key, updating all applications to use the new key, and then safely deleting the old key to maintain secure and functional access control.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Check if there is data to process if old_keys_data: # Iterate over old keys data and try to delete each key for old_key in old_keys_data: try: iam_client.delete_access_key( UserName=old_key['username'], AccessKeyId=old_key['access_key_id'] ) print(f"Deleted access key {old_key['access_key_id']} for user {old_key['username']}.") except iam_client.exceptions.NoSuchEntityException as nee: print(f"No Such Entity Error for {old_key['username']} with key {old_key['access_key_id']}: {str(nee)}") except iam_client.exceptions.LimitExceededException as lee: print(f"Limit Error for {old_key['username']} with key {old_key['access_key_id']}: {str(lee)}") except iam_client.exceptions.ServiceFailureException as sfe: print(f"Service Failure for {old_key['username']} with key {old_key['access_key_id']}: {str(sfe)}") except Exception as e: print(f"An unexpected error occurred while deleting key for {old_key['username']} with key {old_key['access_key_id']}: {str(e)}") else: print("No old keys data was passed to this task")
        copied
        9.2.4
  10. 10

    This runbook involves monitoring the age of IAM user access keys and replacing them periodically. Access keys are used to authenticate programmatic requests to AWS services. Over time, the risk of these keys being compromised increases, either through unintentional exposure or malicious activities. By routinely checking the age of these keys, organizations can identify which ones are nearing or past their recommended lifespan. Rotating, or replacing, these old keys reduces potential security vulnerabilities. The process typically involves creating a new set of access keys, updating all applications or services to use the new keys, and then deactivating the old keys. This proactive approach ensures that AWS access remains secure and aligns with industry standards for credential management.

    threshold_age=90 #Hardcoded for a single execution result, can be changed as per the user
    copied
    10
    1. 10.1

      This lists all IAM users in an AWS account, providing key details like usernames, user IDs, and creation dates. Essential for managing permissions and auditing access, this function supports security and compliance protocols by offering a clear view of user entities and their access levels. It's instrumental in enforcing security policies and the principle of least privilege in AWS resource access management.

      import boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Create a paginator for the list_users operation paginator = iam_client.get_paginator('list_users') # Use the paginator to paginate through the users table = context.newtable() table.title = "User list" table.num_cols = 3 table.num_rows = 1 table.has_header_row = True rownum = 0 table.setval(rownum, 0, "User name") table.setval(rownum, 1, "User ID") table.setval(rownum, 2, "Created on") for page in paginator.paginate(): users = page['Users'] table.num_rows += len(page['Users']) # Output user details if users: # print("List of IAM Users:") for user in users: rownum += 1 # print(f"Username: {user['UserName']}, User ID: {user['UserId']}, Created On: {user['CreateDate']}") table.setval(rownum, 0, user['UserName']) table.setval(rownum, 1, user['UserId']) table.setval(rownum, 2, user['CreateDate']) else: print("No IAM users found in this page.") # Handle specific exceptions except botocore.exceptions.NoCredentialsError: print("Credentials not available") except botocore.exceptions.PartialCredentialsError: print("Incomplete credentials provided") except botocore.exceptions.SSLError: print("SSL connection could not be established. Ensure your network allows SSL connections to AWS services") except botocore.exceptions.EndpointConnectionError: print("Unable to connect to the endpoint. Check your AWS configuration and network settings") except botocore.exceptions.ClientError as e: print(f"Unexpected error occurred accessing AWS: {e}") # Handle general exceptions except Exception as e: print(f"An unhandled error occurred: {str(e)}")
      copied
      10.1
    2. 10.2

      This task identifies and isolates AWS IAM (Identity and Access Management) access keys that have surpassed a predefined age threshold. AWS IAM keys are utilized to securely control access to AWS services and resources. As a best practice for secure access management, it is recommended to regularly rotate IAM access keys and retire those that are no longer needed or have become outdated. By filtering out old access keys, administrators can ensure that access credentials are not overly permissive or unnecessarily prolonged, thereby enhancing the security posture. This task involves analyzing the creation date of each IAM access key, comparing it against the current date, and identifying keys that exceed the acceptable age limit, which are then either flagged for review to uphold stringent access control and minimize potential security risks.

      import boto3 from datetime import datetime creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Define the threshold age in days #threshold_age = 55 # List to store old access key data old_keys_data = [] try: # Initialize a flag for old keys detection old_keys_found = False # Check access keys for each user for user in users: username = user['UserName'] access_keys = iam_client.list_access_keys(UserName=username)['AccessKeyMetadata'] # Check age of each access key for key in access_keys: #print(key) # for debugging key_age = (datetime.now(datetime.utcnow().astimezone().tzinfo) - key['CreateDate']).days if key_age > int(threshold_age): print(f"User: {username}, Access Key: {key['AccessKeyId']}, Age: {key_age} days, Status: {key['Status']}") old_keys_found = True # Add old key data to list old_keys_data.append({ 'username': username, 'access_key_id': key['AccessKeyId'] }) # If no old keys are found, print a message if not old_keys_found: print("No access keys older than the defined threshold were found.") else: print("Old key data: ", old_keys_data) # Pass `old_keys_data` to downstream task here except boto3.exceptions.botocore.exceptions.PartialCredentialsError as pce: print(f"Credentials error: {str(pce)}") except boto3.exceptions.botocore.exceptions.BotoCoreError as bce: print(f"BotoCore Error: {str(bce)}") except boto3.exceptions.botocore.exceptions.ClientError as ce: print(f"Client Error: {str(ce)}") except Exception as e: print(f"An unexpected error occurred: {str(e)}") context.skip_sub_tasks=True
      copied
      10.2
      1. 10.2.1

        This task involves deactivating IAM (Identity and Access Management) access keys in AWS that have surpassed a specified age or are no longer in use, as a measure to enhance security. Regularly auditing and deactivating stale or outdated access keys restrict unauthorized or inadvertent access to AWS resources and services. This task deactivates access keys that are identified as old, thereby ensuring they cannot be used to authenticate API requests. This practice is pivotal in a robust IAM policy to assure that only active and necessary access keys are in circulation, thereby safeguarding the AWS environment against potential malicious activities or inadvertent misconfigurations by reducing the attack surface and adhering to the principle of least privilege.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) ''' # Example input data old_keys_data = [ {'username': 'xyz_other_account', 'access_key_id': 'AJHBVFNONLHGBFHAS2CM'}, # ... received from parent task ] ''' try: # Check if old_keys_data is not empty if old_keys_data: # Loop through each key data in the input for key_data in old_keys_data: username = key_data['username'] access_key_id = key_data['access_key_id'] # Deactivate the access key #iam_client.update_access_key(UserName=username, AccessKeyId=access_key_id, Status='Inactive') print(f"Deactivated access key {access_key_id} for user {username}") else: print("No old keys provided for deactivation.") except boto3.exceptions.botocore.exceptions.PartialCredentialsError as pce: print(f"Credentials error: {str(pce)}") except boto3.exceptions.botocore.exceptions.BotoCoreError as bce: print(f"BotoCore Error: {str(bce)}") except boto3.exceptions.botocore.exceptions.ClientError as ce: print(f"Client Error: {str(ce)}") except Exception as e: print(f"An unexpected error occurred: {str(e)}") context.proceed=False
        copied
        10.2.1
      2. 10.2.2

        This task involves generating a new set of credentials – an access key ID and a secret access key – for an AWS Identity and Access Management (IAM) user. These credentials are vital for programmatic access to AWS services, enabling API calls to be authenticated and authorized. Within AWS, an IAM user can have a maximum of two active access keys, facilitating seamless key rotation. The procedure to create an access key includes the automatic creation of an access key ID and a secret key, which should be securely stored immediately upon creation, as AWS does not allow for the retrieval of the secret key at a later time. Implementing good practices, such as routinely rotating and responsibly managing access keys, is crucial to maintaining secure user access to AWS services.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) if old_keys_data: # Iterate over old keys data and create new keys for the respective users for old_key in old_keys_data: try: new_key = iam_client.create_access_key(UserName=old_key['username']) print(f"New key created for {old_key['username']}:") print(new_key) except iam_client.exceptions.LimitExceededException as lee: print(f"Limit Error creating key for {old_key['username']}: {str(lee)}") except iam_client.exceptions.NoSuchEntityException as nee: print(f"No Such Entity Error for {old_key['username']}: {str(nee)}") except iam_client.exceptions.ServiceFailureException as sfe: print(f"Service Failure for {old_key['username']}: {str(sfe)}") except Exception as e: print(f"An unexpected error occurred while creating key for {old_key['username']}: {str(e)}") else: print("No old keys data was passed to this task") context.proceed=False
        copied
        10.2.2
      3. 10.2.3

        This task pertains to managing and refreshing AWS Identity and Access Management (IAM) user credentials to uphold security best practices. IAM access keys, which consist of an access key ID and a secret access key, are used to authenticate AWS API requests. However, if these keys are compromised or simply aged, updating them becomes crucial to safeguard the account. Updating might involve changing the status of the keys (activating or deactivating them), in this case we are deactivating them. The practice of regularly updating access keys is crucial in minimizing the risk associated with long-term key usage or potential unauthorized access.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Check if old_keys_data is not empty if old_keys_data: # Loop through each key data in the input for key_data in old_keys_data: username = key_data['username'] access_key_id = key_data['access_key_id'] # Deactivate the access key iam_client.update_access_key(UserName=username, AccessKeyId=access_key_id, Status='Inactive') print(f"Deactivated access key {access_key_id} for user {username}") else: print("No old keys provided for deactivation.") except boto3.exceptions.botocore.exceptions.PartialCredentialsError as pce: print(f"Credentials error: {str(pce)}") except boto3.exceptions.botocore.exceptions.BotoCoreError as bce: print(f"BotoCore Error: {str(bce)}") except boto3.exceptions.botocore.exceptions.ClientError as ce: print(f"Client Error: {str(ce)}") except Exception as e: print(f"An unexpected error occurred: {str(e)}") context.proceed=False
        copied
        10.2.3
      4. 10.2.4

        This task refers to the removal of an AWS Identity and Access Management (IAM) user's access keys, ensuring they can no longer be used for authentication with AWS services and resources. IAM access keys comprise an access key ID and a secret access key, which are employed to sign programmatic requests that you make to AWS. Whether it is for security compliance, a response to a security incident, or part of a key rotation policy, deleting an IAM access key is a critical operation. After deletion, any applications or users utilizing the deleted access key will lose access to AWS resources, so it is crucial to update all instances where the key is used before deletion. Additionally, AWS recommends regular access key rotation as a best practice, which involves creating a new key, updating all applications to use the new key, and then safely deleting the old key to maintain secure and functional access control.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Check if there is data to process if old_keys_data: # Iterate over old keys data and try to delete each key for old_key in old_keys_data: try: iam_client.delete_access_key( UserName=old_key['username'], AccessKeyId=old_key['access_key_id'] ) print(f"Deleted access key {old_key['access_key_id']} for user {old_key['username']}.") except iam_client.exceptions.NoSuchEntityException as nee: print(f"No Such Entity Error for {old_key['username']} with key {old_key['access_key_id']}: {str(nee)}") except iam_client.exceptions.LimitExceededException as lee: print(f"Limit Error for {old_key['username']} with key {old_key['access_key_id']}: {str(lee)}") except iam_client.exceptions.ServiceFailureException as sfe: print(f"Service Failure for {old_key['username']} with key {old_key['access_key_id']}: {str(sfe)}") except Exception as e: print(f"An unexpected error occurred while deleting key for {old_key['username']} with key {old_key['access_key_id']}: {str(e)}") else: print("No old keys data was passed to this task")
        copied
        10.2.4
  11. 11

    This runbook involves listing all IAM users, identifying those who haven't accessed AWS services for a specified period, and then safely deleting these inactive users. This process enhances security by removing potential vulnerabilities and optimizes resource usage in the AWS environment. Always proceed with caution to avoid unintended deletions.

    days_inactive=90 #Hardcoded for a single execution result, user can modify as per requirements
    copied
    11
    1. 11.1

      This lists all IAM users in an AWS account, providing key details like usernames, user IDs, and creation dates. Essential for managing permissions and auditing access, this function supports security and compliance protocols by offering a clear view of user entities and their access levels. It's instrumental in enforcing security policies and the principle of least privilege in AWS resource access management.

      import boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Create a paginator for the list_users operation paginator = iam_client.get_paginator('list_users') # Use the paginator to paginate through the users table = context.newtable() table.title = "User list" table.num_cols = 3 table.num_rows = 1 table.has_header_row = True rownum = 0 table.setval(rownum, 0, "User name") table.setval(rownum, 1, "User ID") table.setval(rownum, 2, "Created on") for page in paginator.paginate(): users = page['Users'] table.num_rows += len(page['Users']) # Output user details if users: # print("List of IAM Users:") for user in users: rownum += 1 # print(f"Username: {user['UserName']}, User ID: {user['UserId']}, Created On: {user['CreateDate']}") table.setval(rownum, 0, user['UserName']) table.setval(rownum, 1, user['UserId']) table.setval(rownum, 2, user['CreateDate']) else: print("No IAM users found in this page.") # Handle specific exceptions except botocore.exceptions.NoCredentialsError: print("Credentials not available") except botocore.exceptions.PartialCredentialsError: print("Incomplete credentials provided") except botocore.exceptions.SSLError: print("SSL connection could not be established. Ensure your network allows SSL connections to AWS services") except botocore.exceptions.EndpointConnectionError: print("Unable to connect to the endpoint. Check your AWS configuration and network settings") except botocore.exceptions.ClientError as e: print(f"Unexpected error occurred accessing AWS: {e}") # Handle general exceptions except Exception as e: print(f"An unhandled error occurred: {str(e)}")
      copied
      11.1
    2. 11.2

      This task identifies users who haven't accessed AWS services within a specified timeframe. This process helps to maintain a secure and well-organized IAM environment by focusing on active users and potentially deactivating or removing those who are no longer in use.

      import datetime from dateutil.tz import tzlocal # Filter out users who haven't accessed AWS services for a specified number of days current_time = datetime.datetime.now(tzlocal()) # Check if users list is empty or not passed from the upstream task if not users: print("No users provided from the upstream task.") else: #days_inactive = 90 # Adjust as needed inactive_users = [] for user in users: if 'PasswordLastUsed' not in user: continue last_used = user['PasswordLastUsed'] days_since_last_use = (current_time - last_used).days if days_since_last_use > int(days_inactive): inactive_users.append(user) # Check if there are any inactive users if not inactive_users: print("No inactive users found.") else: for user in inactive_users: days_since_last_use = (current_time - user['PasswordLastUsed']).days print(f"Inactive User: {user['UserName']}, Last Used: {user['PasswordLastUsed']}, Inactivity: {days_since_last_use} days") context.skip_sub_tasks=True
      copied
      11.2
      1. 11.2.1

        This task deletes an IAM user in AWS which is a critical step in managing access to AWS resources. This process ensures that the user no longer has permission to perform actions or access resources. It involves several key steps: detaching all associated policies, removing any login profiles or access keys, and finally, deleting the user itself. This action is irreversible, and once the user is deleted, they cannot access the AWS Management Console, AWS CLI, or API operations unless recreated. Properly removing users helps in maintaining a secure and tidy AWS environment, especially when individuals no longer require access or have changed roles.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM and STS clients iam = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) sts = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def delete_iam_user(username=None): """ Delete an IAM user and its associated resources. Parameters: - username (str, optional): The name of the IAM user to delete. """ # Step 0: Preliminary check if a username is provided if not username: print("Error: Username is required to delete an IAM user.") return # Step 1: Check if the user exists try: iam.get_user(UserName=username) except iam.exceptions.NoSuchEntityException: print(f"User {username} does not exist.") return except Exception as e: print(f"Error fetching details for IAM user {username}: {e}") return # Step 2: Delete access keys associated with the user try: # Fetching all the access keys associated with the user access_keys = iam.list_access_keys(UserName=username) # Iterate through each access key and delete them for key_metadata in access_keys['AccessKeyMetadata']: iam.delete_access_key(UserName=username, AccessKeyId=key_metadata['AccessKeyId']) print(f"Deleted access key {key_metadata['AccessKeyId']} for user {username}.") except Exception as e: print(f"Error deleting access keys for user {username}: {e}") # Step 3: Delete login profile for the user try: # Deleting the console access (login profile) of the user iam.delete_login_profile(UserName=username) print(f"Login profile for user {username} deleted successfully.") except iam.exceptions.NoSuchEntityException: print(f"No login profile found for user {username}.") except Exception as e: print(f"Error deleting login profile for user {username}: {e}") # Step 4: Detach all policies associated with the user # Using a paginator to handle users with a large number of attached policies paginator = iam.get_paginator('list_attached_user_policies') for page in paginator.paginate(UserName=username): for policy in page['AttachedPolicies']: try: # Detaching each policy from the user iam.detach_user_policy(UserName=username, PolicyArn=policy['PolicyArn']) print(f"Detached policy {policy['PolicyName']} from user {username}.") except Exception as e: print(f"Error detaching policy {policy['PolicyName']} from user {username}: {e}") # Step 5: Delete the IAM user try: # Deleting the user from AWS IAM iam.delete_user(UserName=username) print(f"IAM user {username} deleted successfully.") except Exception as e: print(f"Error deleting IAM user {username}: {e}") # Step 6: Post-deletion verification try: # Checking if the user still exists response = iam.get_user(UserName=username) print(f"User {username} still exists!") except iam.exceptions.NoSuchEntityException: print(f"Verified that user {username} has been deleted successfully.") # Fetching the identity of the caller for audit/tracking purposes caller_identity = sts.get_caller_identity() print(f"User {username} deleted by: {caller_identity['Arn']}") except Exception as e: print(f"Error verifying the deletion of IAM user {username}: {e}") ''' Specify the username of the IAM user you wish to delete user_to_delete initialized in input parameters ''' user_to_delete = locals().get('user_to_delete', '') or '' if not user_to_delete: print("Please provide a valid user name.") else: delete_iam_user(user_to_delete)
        copied
        11.2.1
  12. 12

    This runbook helps enforce SOC2 compliance in AWS environments. It identifies and remediates security groups allowing unrestricted SSH access in running EC2 instances, ensuring robust security and compliance with SOC2 standards.

    region_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.
    copied
    12
    1. 12.1

      Amazon Elastic Compute Cloud (EC2) is a service offered by Amazon Web Services (AWS) that provides resizable compute capacity in the cloud. Through Boto3's EC2 client, the describe_instances() method provides detailed information about each instance, including its ID, type, launch time, and current state. This capability assists users in effectively monitoring and managing their cloud resources.

      import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_regions(): ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name = 'us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def list_ec2_instances(region=None): # If no region is provided, fetch instances from all regions regions = [region] if region else list_all_regions() # Create an empty list to store instance details instance_details = [] for region in regions: # Try initializing the Boto3 EC2 client for the specific region try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except (NoCredentialsError, PartialCredentialsError): print(f"Failed for {region}: No AWS credentials found or incomplete credentials provided.") continue except BotoCoreError as e: print(f"Failed for {region}: Error initializing the EC2 client due to BotoCore Error: {e}") continue except Exception as e: print(f"Failed for {region}: Unexpected error initializing the EC2 client: {e}") continue #print(f"Fetching EC2 instance details for region: {region}...") # Try to paginate through the EC2 instance responses for the specific region try: paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page['Reservations']: for instance in reservation['Instances']: # Extract the desired attributes instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'] state = instance['State']['Name'] # Append the details to the list instance_details.append({ 'InstanceId': instance_id, 'InstanceType': instance_type, 'LaunchTime': launch_time, 'State': state, 'Region': region }) #print(f"Fetched all instance details for region: {region} successfully!") except ClientError as e: print(f"Failed for {region}: AWS Client Error while fetching EC2 instance details: {e}") except Exception as e: print(f"Failed for {region}: Unexpected error while fetching EC2 instance details: {e}") return instance_details def display_instance_details(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "EC2 Instance Details" table.num_cols = 5 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = ["Instance ID", "Instance Type", "Launch Time", "State", "Region"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the instance data by launch time for better organization data.sort(key=lambda x: x["LaunchTime"], reverse=True) # Populate the table with instance data for row_num, instance in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each instance values = [ instance["InstanceId"], instance["InstanceType"], instance["LaunchTime"].strftime('%Y-%m-%d %H:%M:%S'), # Format the datetime instance["State"], instance["Region"] ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # You can replace None with a specific region string like 'us-east-1' to get instances from a specific region # Hardcoded region_name for One time Execution Result region_name=None instances_list = list_ec2_instances(region_name) if instances_list: ''' print("\nEC2 Instance Details:") for instance in instances_list: print("-" * 50) # Separator line for key, value in instance.items(): print(f"{key}: {value}")''' display_instance_details(instances_list) else: print("No instances found or an error occurred.")
      copied
      12.1
    2. 12.2

      This task is designed to audit AWS environments for SOC2 compliance. It systematically identifies security groups in running EC2 instances that permit unrestricted SSH access, flagging potential security vulnerabilities and aiding in maintaining SOC2 compliance standards.

      import boto3 from botocore.exceptions import NoCredentialsError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_ssh_access(security_group, ec2): try: sg = ec2.describe_security_groups(GroupIds=[security_group])['SecurityGroups'][0] for permission in sg.get('IpPermissions', []): if permission.get('FromPort') == 22 and permission.get('ToPort') == 22: for ip_range in permission.get('IpRanges', []): if ip_range.get('CidrIp') == '0.0.0.0/0': return True return False except Exception as e: print(f"Error checking security group {security_group}: {e}") return False def get_security_groups_in_use(ec2): sg_in_use = set() try: instances = ec2.describe_instances( Filters=[{'Name': 'instance-state-name', 'Values': ['running']}] ) for reservation in instances.get('Reservations', []): for instance in reservation.get('Instances', []): for sg in instance.get('SecurityGroups', []): sg_in_use.add(sg['GroupId']) return sg_in_use except Exception as e: print(f"Error retrieving instances: {e}") return sg_in_use def check_region_for_unrestricted_ssh(region, sgs_to_remediate): ec2 = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) print(f"Checking region: {region}") sg_in_use = get_security_groups_in_use(ec2) unrestricted_ssh = [sg_id for sg_id in sg_in_use if check_ssh_access(sg_id, ec2)] if unrestricted_ssh: print(f"Region {region}: Security Groups with unrestricted SSH access:") for sg_id in unrestricted_ssh: print(sg_id) sgs_to_remediate[region] = unrestricted_ssh else: print(f"Region {region}: No security groups with unrestricted SSH access found.") def check_all_regions(region_name=None): sgs_to_remediate = {} if region_name: check_region_for_unrestricted_ssh(region_name, sgs_to_remediate) else: ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] for region in regions: check_region_for_unrestricted_ssh(region, sgs_to_remediate) return sgs_to_remediate try: region_name = None # Set to specific region or None for all regions sgs_to_remediate = check_all_regions(region_name) print("Security Groups to Remediate:", sgs_to_remediate) except NoCredentialsError: print("Error: AWS credentials not available. Please configure them.") except ClientError as e: print(f"AWS Client error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") context.skip_sub_tasks=True
      copied
      12.2
      1. 12.2.1

        This task identifies and corrects security groups in AWS EC2, which allow unrestricted SSH access.

        import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def remove_unrestricted_ssh(security_group_id, region): """ This function attempts to remove unrestricted SSH access from the specified security group. :param security_group_id: The ID of the AWS security group. :param region: The AWS region where the security group is located. :return: Boolean indicating whether the unrestricted SSH access was successfully removed. """ # Initialize the boto3 client for EC2 in the specified region. ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) try: # Retrieve the details of the specified security group. sg = ec2.describe_security_groups(GroupIds=[security_group_id])['SecurityGroups'][0] # Iterate over the ingress permissions of the security group. for permission in sg.get('IpPermissions', []): # Check for SSH access (port 22) from anywhere (0.0.0.0/0). if permission.get('FromPort') == 22 and permission.get('ToPort') == 22: for ip_range in permission.get('IpRanges', []): if ip_range.get('CidrIp') == '0.0.0.0/0': # Revoke the ingress rule that allows unrestricted SSH access. ec2.revoke_security_group_ingress( GroupId=security_group_id, IpPermissions=[{ 'FromPort': 22, 'ToPort': 22, 'IpProtocol': 'tcp', 'IpRanges': [{'CidrIp': '0.0.0.0/0'}] }] ) print(f"Removed unrestricted SSH access from {security_group_id} in {region}") return True # If no unrestricted SSH access is found. print(f"No unrestricted SSH access found for {security_group_id} in {region}") return False except ClientError as e: # Handle client errors, such as incorrect permissions or non-existent resources. print(f"ClientError modifying security group {security_group_id} in {region}: {e}") return False except BotoCoreError as e: # Handle errors from the core Boto3 library. print(f"BotoCoreError encountered: {e}") return False except Exception as e: # Catch-all for any other unexpected exceptions. print(f"An unexpected error occurred: {e}") return False def remediate_unrestricted_ssh(sgs_to_remediate): """ :param sgs_to_remediate: A dictionary where keys are AWS region names and values are lists of security group IDs. """ for region, sg_ids in sgs_to_remediate.items(): for sg_id in sg_ids: # Attempt to remediate each security group. remove_unrestricted_ssh(sg_id, region) # Example usage #sgs_to_remediate = {'us-west-2': ['sg-4232c07a']} # from upstream task remediate_unrestricted_ssh(sgs_to_remediate)
        copied
        12.2.1
  13. 13

    This runbook checks all EC2 instances in an AWS environment to confirm they do not have public IP addresses. This audit is key to SOC2 compliance, aiming to protect against unauthorized access and minimize cyber threats. Its goal is to ensure that EC2 instances are secured within private networks, aligning with SOC2's focus on system security and integrity.

    region_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.
    copied
    13
    1. 13.1

      Amazon Elastic Compute Cloud (EC2) is a service offered by Amazon Web Services (AWS) that provides resizable compute capacity in the cloud. Through Boto3's EC2 client, the describe_instances() method provides detailed information about each instance, including its ID, type, launch time, and current state. This capability assists users in effectively monitoring and managing their cloud resources.

      import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_regions(): ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name = 'us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def list_ec2_instances(region=None): # If no region is provided, fetch instances from all regions regions = [region] if region else list_all_regions() # Create an empty list to store instance details instance_details = [] for region in regions: # Try initializing the Boto3 EC2 client for the specific region try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except (NoCredentialsError, PartialCredentialsError): print(f"Failed for {region}: No AWS credentials found or incomplete credentials provided.") continue except BotoCoreError as e: print(f"Failed for {region}: Error initializing the EC2 client due to BotoCore Error: {e}") continue except Exception as e: print(f"Failed for {region}: Unexpected error initializing the EC2 client: {e}") continue #print(f"Fetching EC2 instance details for region: {region}...") # Try to paginate through the EC2 instance responses for the specific region try: paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page['Reservations']: for instance in reservation['Instances']: # Extract the desired attributes instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'] state = instance['State']['Name'] # Append the details to the list instance_details.append({ 'InstanceId': instance_id, 'InstanceType': instance_type, 'LaunchTime': launch_time, 'State': state, 'Region': region }) #print(f"Fetched all instance details for region: {region} successfully!") except ClientError as e: print(f"Failed for {region}: AWS Client Error while fetching EC2 instance details: {e}") except Exception as e: print(f"Failed for {region}: Unexpected error while fetching EC2 instance details: {e}") return instance_details def display_instance_details(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "EC2 Instance Details" table.num_cols = 5 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = ["Instance ID", "Instance Type", "Launch Time", "State", "Region"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the instance data by launch time for better organization data.sort(key=lambda x: x["LaunchTime"], reverse=True) # Populate the table with instance data for row_num, instance in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each instance values = [ instance["InstanceId"], instance["InstanceType"], instance["LaunchTime"].strftime('%Y-%m-%d %H:%M:%S'), # Format the datetime instance["State"], instance["Region"] ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # You can replace None with a specific region string like 'us-east-1' to get instances from a specific region # Hardcoded region_name for One time Execution Result region_name=None instances_list = list_ec2_instances(region_name) if instances_list: ''' print("\nEC2 Instance Details:") for instance in instances_list: print("-" * 50) # Separator line for key, value in instance.items(): print(f"{key}: {value}")''' display_instance_details(instances_list) else: print("No instances found or an error occurred.")
      copied
      13.1
    2. 13.2

      This task is focused on identifying EC2 instances in an AWS environment that are assigned public IP addresses. It plays a crucial role in maintaining SOC2 compliance by identifying potential security risks associated with public internet exposure.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_public_ip(ec2_client, region): """ Check EC2 instances in a region for public IPs and categorize them as compliant or non-compliant. :param ec2_client: The boto3 EC2 client. :param region: The AWS region to check. :return: A compliance report dictionary. """ compliance_report = {'compliant': [], 'non_compliant': []} try: # Retrieve all instances in the specified region response = ec2_client.describe_instances() # Iterate over each instance to check for public IP for reservation in response.get('Reservations', []): for instance in reservation.get('Instances', []): instance_id = instance.get('InstanceId') public_ip = instance.get('PublicIpAddress') # Categorize based on public IP presence if public_ip: compliance_report['non_compliant'].append({'InstanceId': instance_id, 'Region': region, 'PublicIP': public_ip}) else: compliance_report['compliant'].append({'InstanceId': instance_id, 'Region': region}) except ClientError as e: print(f"ClientError checking instances in region {region}: {e}") except BotoCoreError as e: print(f"BotoCoreError occurred: {e}") except Exception as e: print(f"Unexpected error occurred in region {region}: {e}") return compliance_report def evaluate_ec2_instances(region_name=None): """ Evaluate EC2 instances for public IP compliance in a specific region or all regions. :param region_name: Specific region name or None for all regions. :return: A global compliance report dictionary. """ global_compliance_report = {'compliant': [], 'non_compliant': []} try: ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') # Determine regions to check regions = [region_name] if region_name else [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] # Check each region for public IP compliance for region in regions: print(f"Checking instances in region: {region}") ec2_region_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) region_compliance_report = check_public_ip(ec2_region_client, region) # Aggregate results global_compliance_report['compliant'].extend(region_compliance_report['compliant']) global_compliance_report['non_compliant'].extend(region_compliance_report['non_compliant']) except ClientError as e: print(f"ClientError while evaluating EC2 instances: {e}") except BotoCoreError as e: print(f"BotoCoreError occurred: {e}") except Exception as e: print(f"Unexpected error occurred: {e}") return global_compliance_report # Example usage #region_name = None #'ap-south-1' # Specify a region or set to None for all regions compliance_report = evaluate_ec2_instances(region_name) # Display compliance summary print("\nCompliance Summary:") # Print details for compliant and non-compliant instances if compliance_report['compliant']: print("\nCompliant Instances:") for instance in compliance_report['compliant']: print(f"InstanceId: {instance['InstanceId']}, Region: {instance['Region']}") else: print("\nNo Compliant Instances Found.") if compliance_report['non_compliant']: print("\nNon-Compliant Instances (with Public IP):") for instance in compliance_report['non_compliant']: print(f"InstanceId: {instance['InstanceId']} \nRegion: {instance['Region']} \nPublic IP: {instance['PublicIP']}\n" + "-"*40) else: print("\nNo Non-Compliant Instances Found.") context.skip_sub_tasks=True
      copied
      13.2
  14. 14

    The AWS Restricted Common Ports Audit rule evaluates security groups to ensure they do not allow unrestricted incoming TCP traffic to specific critical ports for IPv4. It aims to prevent unauthorized access by marking configurations as COMPLIANT when traffic to these ports is appropriately restricted, thereby enhancing the security posture of AWS environments.

    region_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.
    copied
    14
    1. 14.1

      This task identifies security groups allowing unrestricted TCP traffic to specified ports on IPv4, highlighting potential security risks. It ensures traffic to sensitive ports is limited to authorized sources, bolstering network security. The aim is to prevent unauthorized access and exposure of critical services.

      import boto3 from botocore.exceptions import BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Parameters for compliance check BLOCKED_PORTS = [20, 21, 3389, 3306, 4333] # Example ports that should be restricted def fetch_security_groups(ec2_client): """ Fetch all security groups from AWS EC2, with pagination support. """ print("Fetching security groups with pagination support...") security_groups = [] paginator = ec2_client.get_paginator('describe_security_groups') page_iterator = paginator.paginate() for page in page_iterator: security_groups.extend(page['SecurityGroups']) print(f"Fetched {len(page['SecurityGroups'])} security groups in this page.") return security_groups def check_compliance(security_groups, blocked_ports=[20, 21, 3389, 3306, 4333]): """ Check compliance of security groups against the AWS Config 'restricted-common-ports' rule. This includes handling for all traffic permissions and specific blocked TCP ports. """ compliant_groups, non_compliant_groups = [], [] for group in security_groups: # Flag to keep track of compliance status is_compliant = True for permission in group['IpPermissions']: # Check for all traffic permissions (-1 protocol) if permission['IpProtocol'] == '-1': for ip_range in permission.get('IpRanges', []): if ip_range.get('CidrIp') == '0.0.0.0/0': is_compliant = False break for ipv6_range in permission.get('Ipv6Ranges', []): if ipv6_range.get('CidrIpv6') == '::/0': is_compliant = False break # Check for specific blocked TCP ports elif permission['IpProtocol'] == 'tcp': from_port = permission.get('FromPort') to_port = permission.get('ToPort') if from_port is not None and to_port is not None: for blocked_port in blocked_ports: if from_port <= blocked_port <= to_port: # Check both IPv4 and IPv6 ranges for ip_range in permission.get('IpRanges', []): if ip_range.get('CidrIp') == '0.0.0.0/0': is_compliant = False break for ipv6_range in permission.get('Ipv6Ranges', []): if ipv6_range.get('CidrIpv6') == '::/0': is_compliant = False break if not is_compliant: # Mark as non-compliant and break the loop non_compliant_groups.append(group['GroupId']) break if is_compliant: compliant_groups.append(group['GroupId']) # Ensure each security group is only counted once for non-compliance non_compliant_groups = list(set(non_compliant_groups)) return compliant_groups, non_compliant_groups def get_all_regions(ec2_client): """ Fetch all AWS regions that support EC2 service. :param ec2_client: Initialized boto3 EC2 client :return: List of region names """ regions = [] try: regions_response = ec2_client.describe_regions() regions = [region['RegionName'] for region in regions_response['Regions']] except ClientError as e: print(f"An error occurred fetching regions: {e}") except Exception as e: print(f"Unexpected error: {e}") return regions def main1(region_name=None): """ Main function to perform compliance check on AWS security groups. :param region_name: AWS region name to check security groups. If None, checks all regions. """ print("Starting compliance check for restricted common ports...") regions_to_check = [region_name] if region_name else get_all_regions(boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1')) for region in regions_to_check: print(f"Checking region: {region}") ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) security_groups = fetch_security_groups(ec2_client) if not security_groups: print("No security groups found or unable to fetch security groups.") continue compliant_groups, non_compliant_groups = check_compliance(security_groups) print(f"Compliance check complete for {region}.") print(f"Compliant groups: {len(compliant_groups)}") print(f"Non-compliant groups: {len(non_compliant_groups)}") if non_compliant_groups: print("Non-compliant security group IDs:", ", ".join(non_compliant_groups)) print("-" * 60) main1(region_name)
      copied
      14.1
  15. 15

    This runbook conducts a thorough audit of default security groups in all AWS VPCs, ensuring they disallow any inbound or outbound traffic. It identifies and automatically remediates non-compliant groups to enforce stringent network security standards. The process enhances overall VPC security by adhering to a strict no-traffic policy in default security groups.

    region_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.
    copied
    15
    1. 15.1

      This task enumerates all Virtual Private Clouds across every AWS region in an account. This task is essential for network management, security audits, and resource tracking, especially in large-scale environments. It provides details like VPC IDs, CIDR blocks, and associated resources for each VPC.

      import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_vpcs_in_region(region_name): vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) ec2 = session.client('ec2') response = ec2.describe_vpcs() vpcs = response.get('Vpcs', []) if vpcs: print(f"In region '{region_name}', found the following VPCs:") for vpc in vpcs: vpc_id = vpc['VpcId'] vpcs_info.append({'Region': region_name, 'VPC_ID': vpc_id}) print(f" VPC ID: {vpc_id}") else: print(f"No VPCs found in region '{region_name}'.") except ClientError as e: print(f"An error occurred in region {region_name}: {e}") except Exception as e: print(f"An unexpected error occurred in region {region_name}: {e}") return vpcs_info def list_vpcs_all_regions(): all_vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') ec2 = session.client('ec2') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] for region in regions: vpcs_info = list_vpcs_in_region(region) all_vpcs_info.extend(vpcs_info) except ClientError as e: print(f"An error occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return all_vpcs_info #region_name = None # Replace with a region name or leave as None for all regions if region_name: vpcs = list_vpcs_in_region(region_name) else: vpcs = list_vpcs_all_regions() #print(vpcs) # Summary of all VPCs across regions if vpcs: print("\nSummary of all VPCs across regions:") for vpc in vpcs: print(f"Region: {vpc['Region']}, VPC ID: {vpc['VPC_ID']}") else: print("No VPCs found in any of the regions.")
      copied
      15.1
    2. 15.2

      This task focuses on scrutinizing default security groups in AWS VPCs to identify and flag those allowing unauthorized traffic. It serves as a critical measure to pinpoint security groups that deviate from the no-traffic policy, ensuring adherence to stringent network security protocols in VPC environments.

      import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_default_security_groups(region_name=None): non_compliant_sgs = [] compliant_sgs = [] if region_name: print(f"Checking default security groups in specified region: {region_name}") regions_to_check = [region_name] else: print("Checking default security groups in all AWS regions...") regions_to_check = [region['RegionName'] for region in boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] for region in regions_to_check: try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) vpcs = ec2_client.describe_vpcs()['Vpcs'] if not vpcs: print(f"No VPCs found in region {region}.") continue for vpc in vpcs: sg_response = ec2_client.describe_security_groups( Filters=[{'Name': 'vpc-id', 'Values': [vpc['VpcId']]}, {'Name': 'group-name', 'Values': ['default']}] ) for sg in sg_response['SecurityGroups']: inbound_rules = sg['IpPermissions'] outbound_rules = sg['IpPermissionsEgress'] if inbound_rules or outbound_rules: non_compliant_sgs.append({ 'Region': region, 'VpcId': vpc['VpcId'], 'SecurityGroupId': sg['GroupId'] }) else: compliant_sgs.append({ 'Region': region, 'VpcId': vpc['VpcId'], 'SecurityGroupId': sg['GroupId'] }) except ClientError as e: print(f"An AWS client error occurred in region {region}: {e}") return non_compliant_sgs, compliant_sgs # Example usage #region_name = None # Use None for all regions, or specify a region like 'us-west-2' non_compliant_security_groups, compliant_security_groups = check_default_security_groups(region_name) if non_compliant_security_groups: print("\nNon-compliant default security groups found:") for sg_info in non_compliant_security_groups: print(f"Region: {sg_info['Region']}, VPC ID: {sg_info['VpcId']}, Security Group ID: {sg_info['SecurityGroupId']} is NON_COMPLIANT") if compliant_security_groups: print("\nCompliant default security groups found:") for sg_info in compliant_security_groups: print(f"Region: {sg_info['Region']}, VPC ID: {sg_info['VpcId']}, Security Group ID: {sg_info['SecurityGroupId']} is COMPLIANT") if not non_compliant_security_groups and not compliant_security_groups: print("\nNo VPCs with default security groups were found.") context.skip_sub_tasks=True
      copied
      15.2
      1. 15.2.1

        This task involves configuring the default security groups within AWS VPCs to strictly enforce a no-traffic policy. It entails systematically updating the security group rules to block all inbound and outbound traffic, ensuring compliance with stringent network security protocols.

        import boto3 from botocore.exceptions import ClientError, BotoCoreError, NoCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def remediate_default_sg_of_vpc(region_name, vpc_id): """ Removes all inbound and outbound rules from the default security group of a specified VPC. Parameters: region_name (str): AWS region of the VPC. vpc_id (str): ID of the VPC whose default security group needs to be remediated. Returns: None """ if not region_name or not vpc_id: print("Error: 'region_name' and 'vpc_id' must be provided.") return try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) sg_response = ec2_client.describe_security_groups( Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}, {'Name': 'group-name', 'Values': ['default']}] ) #print(sg_response) # for debugging if sg_response['SecurityGroups']: sg_id = sg_response['SecurityGroups'][0]['GroupId'] #print(sg_id) # for debugging # Remove all inbound and outbound rules try: # Retrieve existing rules current_sg = ec2_client.describe_security_groups(GroupIds=[sg_id])['SecurityGroups'][0] inbound_rules = current_sg.get('IpPermissions', []) outbound_rules = current_sg.get('IpPermissionsEgress', []) # Remove inbound rules if inbound_rules: ec2_client.revoke_security_group_ingress(GroupId=sg_id, IpPermissions=inbound_rules) print(f"Removed all inbound rules from default security group {sg_id} in VPC {vpc_id}.") # Remove outbound rules if outbound_rules: ec2_client.revoke_security_group_egress(GroupId=sg_id, IpPermissions=outbound_rules) print(f"Removed all outbound rules from default security group {sg_id} in VPC {vpc_id}.") # Verification step updated_sg = ec2_client.describe_security_groups(GroupIds=[sg_id])['SecurityGroups'][0] if not updated_sg.get('IpPermissions') and not updated_sg.get('IpPermissionsEgress'): print(f"Successfully removed all rules from security group {sg_id}.") else: print(f"Rules may not have been completely removed from security group {sg_id}.") except ClientError as e: print(f"Error modifying security group {sg_id}: {e}") else: print(f"No default security group found for VPC {vpc_id}.") except NoCredentialsError: print("Error: No AWS credentials found. Please configure your credentials.") except BotoCoreError as e: print(f"BotoCore Error: {e}") except ClientError as e: print(f"Client Error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # Example usage #region_name = 'us-west-2' # Specify the AWS region #vpc_id = 'vpc-0e42a95f21ed25d5c' # Replace with your VPC ID remediate_default_sg_of_vpc(region_name, vpc_id)
        copied
        15.2.1
  16. 16

    This runbook involves turning on a feature for capturing information about IP traffic going to and from networks interfaces in a Virtual Private Cloud (VPC). This data is vital for network monitoring, security analysis, and troubleshooting. The logs can be stored in Amazon CloudWatch Logs or Amazon S3 for detailed analysis and archival purposes, aiding in compliance and operational auditing.

    region_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.
    copied
    16
    1. 16.1

      This task enumerates all Virtual Private Clouds across every AWS region in an account. This task is essential for network management, security audits, and resource tracking, especially in large-scale environments. It provides details like VPC IDs, CIDR blocks, and associated resources for each VPC.

      import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_vpcs_in_region(region_name): vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) ec2 = session.client('ec2') response = ec2.describe_vpcs() vpcs = response.get('Vpcs', []) if vpcs: print(f"In region '{region_name}', found the following VPCs:") for vpc in vpcs: vpc_id = vpc['VpcId'] vpcs_info.append({'Region': region_name, 'VPC_ID': vpc_id}) print(f" VPC ID: {vpc_id}") else: print(f"No VPCs found in region '{region_name}'.") except ClientError as e: print(f"An error occurred in region {region_name}: {e}") except Exception as e: print(f"An unexpected error occurred in region {region_name}: {e}") return vpcs_info def list_vpcs_all_regions(): all_vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') ec2 = session.client('ec2') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] for region in regions: vpcs_info = list_vpcs_in_region(region) all_vpcs_info.extend(vpcs_info) except ClientError as e: print(f"An error occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return all_vpcs_info #region_name = None # Replace with a region name or leave as None for all regions if region_name: vpcs = list_vpcs_in_region(region_name) else: vpcs = list_vpcs_all_regions() #print(vpcs) # Summary of all VPCs across regions if vpcs: print("\nSummary of all VPCs across regions:") for vpc in vpcs: print(f"Region: {vpc['Region']}, VPC ID: {vpc['VPC_ID']}") else: print("No VPCs found in any of the regions.")
      copied
      16.1
    2. 16.2

      This task identifies Virtual Private Clouds (VPCs) in an AWS environment that lack active Flow Logs. This task is essential for security and compliance, ensuring that network traffic is monitored and logged. It involves checking each VPC's Flow Logs status and isolating those without the feature, helping to prioritize security enhancements and network monitoring strategies.

      import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_vpc_flow_logs(vpc_id, region): try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Check for flow logs response = ec2.describe_flow_logs(Filters=[{'Name': 'resource-id', 'Values': [vpc_id]}]) if response['FlowLogs']: print(f"Flow Logs are enabled for VPC {vpc_id} in region {region}.") return True else: print(f"Flow Logs are not enabled for VPC {vpc_id} in region {region}.") return False except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") return False # Example VPCs list #vpcs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}, {'Region': 'eu-north-1', 'VPC_ID': 'vpc-0db5fbfe0a4263ef5'}] # Checking flow logs for each VPC and collecting VPCs without flow logs vpcs_without_flow_logs = [] for vpc in vpcs: if not check_vpc_flow_logs(vpc['VPC_ID'], vpc['Region']): vpcs_without_flow_logs.append(vpc) #print(vpcs_without_flow_logs) #for debugging context.skip_sub_tasks=True # Example vpcs_without_flow_logs passed to the downstream task #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}, {'Region': 'eu-north-1', 'VPC_ID': 'vpc-0db5fbfe0a4263ef5'}]
      copied
      16.2
      1. 16.2.1

        This task involves setting up a unique data storage bucket in Amazon S3 for storing, managing, and retrieving data, with options for access control, versioning, and lifecycle management. S3 buckets provide a scalable and secure cloud storage solution.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS S3 client initialization s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Bucket name to create #bucket_name = 'my-logging-bucket-name' # Replace with your desired bucket name # Create S3 bucket try: s3_client.create_bucket(Bucket=bucket_name) print(f"Bucket {bucket_name} created successfully.") except ClientError as e: print(f"Error creating S3 bucket {bucket_name}: {e}")
        copied
        16.2.1
      2. 16.2.2

        import boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_bucket_policy(bucket_name, account_number, regions): """ Create a bucket policy for the specified bucket, account number, and regions. """ policy_statements = [ { "Sid": "AWSLogDeliveryAclCheck", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:GetBucketAcl", "Resource": f"arn:aws:s3:::{bucket_name}", "Condition": {"StringEquals": {"aws:SourceAccount": account_number}} } ] resource = f"arn:aws:s3:::{bucket_name}/AWSLogs/{account_number}/*" for region in regions: source_arn = f"arn:aws:logs:{region}:{account_number}:*" policy_statements.append( { "Sid": f"AWSLogDeliveryWrite_{region}", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:PutObject", "Resource": resource, "Condition": { "StringEquals": { "aws:SourceAccount": account_number, "s3:x-amz-acl": "bucket-owner-full-control" }, "ArnLike": {"aws:SourceArn": source_arn} } } ) policy = { "Version": "2012-10-17", "Id": "AWSLogDeliveryWrite20150319", "Statement": policy_statements } return policy def update_s3_bucket_policy(s3_client, bucket_name, policy): """ Update the S3 bucket policy. """ try: s3_client.put_bucket_policy( Bucket=bucket_name, Policy=json.dumps(policy) ) print(f"Bucket policy updated for {bucket_name}.") except ClientError as e: print(f"Error updating bucket policy: {e}") account_number = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key).get_caller_identity()['Account'] #bucket_name = 'your-bucket-name' # Replace with your S3 bucket name #regions_for_bucket_policy = ['us-east-1', 'ap-south-1'] # List of regions # This part will be used if the user has the same logging bucket for multiple regions for VPC Flow Logs # Create S3 client s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Create and update the bucket policy policy = create_bucket_policy(bucket_name, account_number, regions_for_bucket_policy) update_s3_bucket_policy(s3_client, bucket_name, policy) s3_bucket_arn = f"arn:aws:s3:::{bucket_name}" #passed to downstream task
        copied
        16.2.2
      3. 16.2.3

        This task activates a logging feature for Virtual Private Clouds (VPCs) in AWS. This feature records and stores information about the network traffic flowing through the VPC, aiding in security monitoring, traffic analysis, and troubleshooting. The collected data can be sent to Amazon CloudWatch Logs or Amazon S3 for retention and analysis.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def enable_vpc_flow_logs(vpc_id, region, s3_bucket_arn): """ Enable VPC Flow Logs for the specified VPC, directing them to an S3 bucket. """ try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Create the flow log response = ec2.create_flow_logs( ResourceIds=[vpc_id], ResourceType='VPC', TrafficType='ALL', LogDestinationType='s3', LogDestination=s3_bucket_arn ) print(response) if response['Unsuccessful']: print(f"Failed to enable Flow Logs for VPC {vpc_id} in region {region}.") else: print(f"Successfully enabled Flow Logs for VPC {vpc_id} in region {region}.") except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") # List of VPCs without flow logs #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}] # S3 bucket ARN for storing flow logs #s3_bucket_arn = 'arn:aws:s3:::your-bucket-name' # Replace with your S3 bucket ARN # Enabling flow logs for each VPC for vpc in vpcs_without_flow_logs: enable_vpc_flow_logs(vpc['VPC_ID'], vpc['Region'], s3_bucket_arn)
        copied
        16.2.3
  17. 17

    This runbook provides a detailed guide for verifying and/or setting up end-to-end encryption in AWS CloudTrail for SOC2 compliance. It covers configuring CloudTrail with AWS KMS Customer Master Keys (CMKs) for Server-Side Encryption (SSE), including steps for creating or selecting KMS CMKs and ensuring secure encryption of CloudTrail trails.

    region_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.
    copied
    17
    1. 17.1

      This task verifies if AWS CloudTrail is configured with Server-Side Encryption (SSE) using AWS Key Management Service (KMS) Customer Master Keys (CMKs). It ensures that each CloudTrail trail has a KmsKeyId defined, confirming encryption according to SOC2 standards. This process enhances security and meets regulatory requirements for encrypted AWS activity logging.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_trail_encryption(client, region_name): """ Checks CloudTrail trails in a specific region for KMS encryption and whether they are global or regional trails. :param client: Boto3 CloudTrail client :param region_name: Name of the AWS region :return: Tuple of total trails and compliant trails count """ try: trails = client.describe_trails(includeShadowTrails=False)['trailList'] if not trails: print(f"[{region_name}] No CloudTrail trails found.") return 0, 0 compliant_trails = 0 for trail in trails: trail_name = trail['Name'] trail_type = "Global" if trail.get('IsMultiRegionTrail', False) else "Regional" if 'KmsKeyId' in trail: print(f"[{region_name}] {trail_type} Trail '{trail_name}' is compliant with KMS CMK encryption.") compliant_trails += 1 else: print(f"[{region_name}] {trail_type} Trail '{trail_name}' is NOT compliant. KmsKeyId not defined.") print(f"[{region_name}] Summary: {compliant_trails} out of {len(trails)} {trail_type.lower()} trails are compliant with KMS CMK encryption.") return len(trails), compliant_trails except ClientError as e: print(f"AWS client error occurred in {region_name}: {e}") return 0, 0 except Exception as e: print(f"An unexpected error occurred in {region_name}: {e}") return 0, 0 def run_check(selected_region=None): """ Run the CloudTrail encryption check. :param selected_region: Specific region to check. If None, checks all regions. """ if selected_region: regions = [selected_region] else: # Use a default region only for fetching the list of regions default_region_for_fetching_regions = 'us-east-1' ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=default_region_for_fetching_regions) regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] total_compliant = 0 total_trails = 0 for region in regions: print(f"Checking CloudTrail trails in {region}...") cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) regional_trails, regional_compliant = check_trail_encryption(cloudtrail_client, region) total_trails += regional_trails total_compliant += regional_compliant print(f"Overall Summary: {total_compliant} out of {total_trails} total trails across all checked regions are compliant with KMS CMK encryption.") if region_name: # Example usage run_check(region_name) # Check all regions if region_name is None otherwise checks for a specific region passed in the input parameter # run_check('us-west-2') else: run_check() # Script running for all regions context.skip_sub_tasks=True # Remove this line if you want to choose or create a new KMS key to update the trail with
      copied
      17.1
      1. 17.1.1

        This task selects an existing AWS KMS Customer Master Key (CMK) or creates a new one if none exists. It checks for a CMK with a specific alias, creating a new key for encryption purposes as needed. This ensures enhanced security and compliance in AWS environments.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_or_choose_kms_key(alias_name, region_name): """ Creates a new AWS KMS Customer Master Key (CMK) or returns an existing one based on the alias in the specified region. :param alias_name: Alias name for the KMS key. :param region_name: AWS region where the KMS key is to be created or found. :return: ARN of the KMS key. """ kms_client = boto3.client('kms', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Check if an alias exists for the given name aliases = kms_client.list_aliases() for alias in aliases['Aliases']: if alias['AliasName'] == 'alias/' + alias_name: print(f"Existing KMS key found for alias {alias_name} in {region_name}") return alias['TargetKeyId'] # If alias does not exist, create a new KMS CMK print(f"Creating a new KMS CMK for alias {alias_name} in {region_name}") key = kms_client.create_key(Description=f'KMS CMK for CloudTrail in {region_name}') kms_client.create_alias(AliasName='alias/' + alias_name, TargetKeyId=key['KeyMetadata']['KeyId']) return key['KeyMetadata']['Arn'] except ClientError as e: print(f"Error occurred while creating or retrieving KMS key in {region_name}: {e}") return None # Example usage #alias_name = 'my-cloudtrail-key-2' #region_name = 'us-east-1' # Replace with your desired AWS region kms_key_arn = create_or_choose_kms_key(alias_name, region_name) if kms_key_arn: print(f"KMS Key ARN in {region_name}: {kms_key_arn}") # Extracting the KMS Key ID from the ARN kms_key_id = kms_key_arn.split(':')[-1].split('/')[-1] # print(kms_key_id) # for debugging # Example Structure # kms_key_arn = "arn:aws:kms:us-east-1:355237452254:key/7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7" # kms_key = "7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7"
        copied
        17.1.1
      2. 17.1.2

        This task updates the AWS KMS key policy to authorize AWS CloudTrail to encrypt log files using the specified KMS key. The objective is to secure CloudTrail logs with KMS encryption, ensuring enhanced security and compliance. The process involves modifying the KMS key policy to include permissions for CloudTrail operations.

        import boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_aws_account_id(): try: sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') account_id = sts_client.get_caller_identity()["Account"] return account_id except ClientError as e: print(f"An AWS client error occurred: {e}") return None except Exception as e: print(f"An unexpected error occurred: {e}") return None def update_kms_policy(kms_key_id): """ Updates the KMS key policy to allow CloudTrail to use the key. :param kms_key_id: The ID or ARN of the KMS key. """ account_id = get_aws_account_id() if not account_id: print("Unable to retrieve AWS account ID.") return kms_client = boto3.client('kms',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Retrieve the current key policy policy = kms_client.get_key_policy(KeyId=kms_key_id, PolicyName='default')['Policy'] policy_dict = json.loads(policy) # Append the new statement for CloudTrail cloudtrail_statement = { "Sid": "Allow CloudTrail to use the key", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": [ "kms:GenerateDataKey*", "kms:DescribeKey" ], "Resource": "*", "Condition": { "StringLike": { "kms:EncryptionContext:aws:cloudtrail:arn": f"arn:aws:cloudtrail:*:{account_id}:trail/*" } } } policy_dict['Statement'].append(cloudtrail_statement) # Update the key policy kms_client.put_key_policy( KeyId=kms_key_id, PolicyName='default', Policy=json.dumps(policy_dict) ) print(f"KMS key policy updated successfully for key: {kms_key_id}") except ClientError as e: print(f"Error updating KMS key policy: {e}") # Example usage #kms_key_id = '7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7' # Replace with your KMS key ID or ARN update_kms_policy(kms_key_id) context.proceed = False
        copied
        17.1.2
      3. 17.1.3

        This task updates an AWS CloudTrail trail to use an AWS Key Management Service (KMS) Customer Master Key (CMK) for server-side encryption. It ensures that the trail's logs are encrypted with a specified KMS key, enhancing the security and confidentiality of audit log files. This update is vital for maintaining compliance and robust data protection standards in AWS.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] trail_name = alias_name # Received from upstream tasks def update_trail_encryption(trail_name, kms_key_id, region_name): """ Updates a CloudTrail trail to use KMS encryption. :param trail_name: Name of the CloudTrail trail :param kms_key_id: The KMS key ARN or ID :param region_name: AWS region where the trail is located """ try: cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) cloudtrail_client.update_trail( Name=trail_name, KmsKeyId=kms_key_id ) print(f"Trail '{trail_name}' in {region_name} updated to use KMS CMK: {kms_key_id}") except ClientError as e: print(f"Error updating trail in {region_name}: {e}") # Example usage #trail_name = 'test-trail-1-east-1' # Replace with your trail name #kms_key_id = '28f9f7ce-41db-42fd-bfcf-be554ed408d3' # Replace with your KMS CMK ID or ARN #kms_key_id received from upstream task #region_name = 'us-east-1' # Replace with the region of your CloudTrail trail update_trail_encryption(trail_name, kms_key_id, region_name)
        copied
        17.1.3
  18. 18

    This runbook ensures that CloudTrail, AWS's service for logging API activity, has log file validation enabled. This is crucial for SOC2 compliance, which demands secure and private handling of customer data. Enabling log file validation helps verify the integrity and authenticity of CloudTrail logs, demonstrating a commitment to robust information security practices.

    target_region = None # Hardcoded for single execution result, user needs to set up the target_region when setting up log file validation
    copied
    18
    1. 18.1

      This task involves enumerating and retrieving detailed information about every AWS CloudTrail trail that exists across all AWS regions within an AWS account. Each trail captures specific API activity and events, and having a comprehensive list helps in providing visibility into what actions are being logged, where the logs are stored, and how they are configured. This listing process is foundational for subsequent tasks like auditing, analysis, and optimization of AWS CloudTrail, aiding in efficient resource management and security compliance.

      import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Set region to None for all regions, or specify a valid AWS region string for a specific region #target_region = None target_region = target_region if target_region else None try: # List all available AWS regions ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') all_regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] except Exception as e: print(f"ERROR: An error occurred while listing AWS regions: {e}") all_regions = [] # Get trails for all regions or a specific region regions_to_check = all_regions if target_region is None else [target_region] all_trails = [] for region in regions_to_check: try: # List all trails in AWS CloudTrail for each region cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) response = cloudtrail_client.describe_trails(includeShadowTrails=False) trails_in_region = response['trailList'] if not trails_in_region: print(f"INFO: No trails found in region {region}.") else: all_trails.extend(trails_in_region) except Exception as e: # Handle exceptions thrown while listing trails for a region print(f"ERROR: An error occurred while listing trails in region {region}: {e}") # Print all trails if not all_trails: print("INFO: No trails found in all specified regions.") else: try: #print(all_trails) # for downstream task for trail in all_trails: print(f"Trail Name: {trail['Name']}, Trail ARN: {trail['TrailARN']}, Home Region: {trail['HomeRegion']}") except KeyError as ke: print(f"ERROR: Missing key {ke} in trail information: {trail}") except Exception as e: print(f"ERROR: An error occurred while printing trail information: {e}") print(f"SUMMARY: Processed {len(regions_to_check)} regions and found a total of {len(all_trails)} trails.")
      copied
      18.1
    2. 18.2

      This task audits AWS CloudTrail Trails for SOC2 Compliance by checking Log File Validation across various regions. It evaluates each trail for enabled log file validation and the presence of a valid 'LatestDigestDeliveryTime'. Trails are marked as compliant or non-compliant based on these criteria, with specific reasons for non-compliance provided.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_trail_status(region): try: cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) trails = cloudtrail_client.describe_trails(includeShadowTrails=True)['trailList'] if not trails: print(f"No CloudTrail trails found in region '{region}'.") return {} trail_statuses = {} for trail in trails: trail_name = trail['Name'] home_region = trail.get('HomeRegion') if home_region == region: try: trail_desc = cloudtrail_client.describe_trails(trailNameList=[trail_name]) trail_status = cloudtrail_client.get_trail_status(Name=trail_name) #print(trail_desc) # for debugging #print(trail_status) # for debugging log_file_validation_enabled = any(tr.get('LogFileValidationEnabled', False) for tr in trail_desc['trailList']) has_valid_digest_time = trail_status.get('LatestDigestDeliveryTime') is not None trail_data = { 'IsLogging': trail_status.get('IsLogging'), 'LatestDeliveryTime': trail_status.get('LatestDeliveryTime'), 'LatestDigestDeliveryTime': trail_status.get('LatestDigestDeliveryTime'), 'LogFileValidationEnabled': log_file_validation_enabled, 'HasValidDigestTime': has_valid_digest_time, 'ComplianceReason': determine_compliance_reason(log_file_validation_enabled, has_valid_digest_time) } trail_statuses[trail_name] = trail_data except ClientError as error: print(f"Error checking status for trail '{trail_name}' in region '{region}': {error}") return trail_statuses except (ClientError, BotoCoreError) as error: print(f"Error checking trails in region '{region}': {error}") return {} def determine_compliance_reason(log_file_validation_enabled, has_valid_digest_time): if not log_file_validation_enabled: return "Log file validation not enabled" if not has_valid_digest_time: return "No valid LatestDigestDeliveryTime" return "Compliant(Both Log File Validation and LatestDigestDeliveryTime are valid)" # Get All Available Regions regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] compliance_summary = {"Compliant": [], "Non-Compliant": []} for region in regions: print(f"\nChecking trails in region: {region}") region_status = check_trail_status(region) for trail, status in region_status.items(): compliance = "Compliant" if status['LogFileValidationEnabled'] and status['HasValidDigestTime'] else "Non-Compliant" reason = status['ComplianceReason'] print(f"Region: {region}, Trail: {trail}, Compliance: {compliance}, Reason: {reason}") compliance_summary[compliance].append(f"{region}:{trail}") # Output compliance summary print("\nCompliance Summary:") for status, trails in compliance_summary.items(): print(f"{status} Trails: {len(trails)}") for trail_info in trails: print(f" - {trail_info}") context.proceed=False # As downstream runbooks can't be included in the single execution result because of complex user based input parameters context.skip_sub_tasks=True
      copied
      18.2
      1. 18.2.1

        This task automates the enforcement and verification of log file validation for AWS CloudTrail trails. It checks if log file validation is enabled for a specified trail and activates it if necessary. Post-activation, it confirms the validation status, ensuring compliance with security best practices.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def is_log_file_validation_enabled(trail_name, cloudtrail_client): """Check if log file validation is already enabled for the specified trail.""" try: response = cloudtrail_client.describe_trails(trailNameList=[trail_name]) for trail in response['trailList']: if trail['Name'] == trail_name: return trail.get('LogFileValidationEnabled', False) except ClientError as error: print(f"Error checking log file validation status for trail '{trail_name}': {error}") return False def enable_log_file_validation(trail_name, region): regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] if region not in regions: print(f"Invalid region: {region}") return try: cloudtrail_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) if is_log_file_validation_enabled(trail_name, cloudtrail_client): print(f"Log file validation is already enabled for trail '{trail_name}'.") return cloudtrail_client.update_trail( Name=trail_name, EnableLogFileValidation=True ) print(f"Log file validation enabled for trail '{trail_name}' in region '{region}'.") except ClientError as error: print(f"Error enabling log file validation for trail '{trail_name}': {error}") def verify_log_file_validation(trail_name, region): regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] if region not in regions: print(f"Invalid region: {region}") return try: cloudtrail_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) response = cloudtrail_client.describe_trails(trailNameList=[trail_name]) if not response['trailList']: print(f"Trail '{trail_name}' not found in region '{region}'.") return for trail in response['trailList']: if trail.get('Name') == trail_name: print(f"Trail Name: {trail.get('Name')}") print(f"S3 Bucket Name: {trail.get('S3BucketName')}") print(f"Is Multi-Region Trail: {trail.get('IsMultiRegionTrail')}") print(f"Home Region: {trail.get('HomeRegion')}") print(f"Trail ARN: {trail.get('TrailARN')}") print(f"Log File Validation Enabled: {trail.get('LogFileValidationEnabled')}") return print(f"Trail '{trail_name}' not found in region '{region}'.") except ClientError as error: print(f"Error verifying log file validation for trail '{trail_name}': {error}") #trail_name = 'test-delete' # Replace with your trail name #region = 'us-east-1' # Replace with the region of your trail # Enable log file validation enable_log_file_validation(trail_name, region) # Verify log file validation verify_log_file_validation(trail_name, region)
        copied
        18.2.1
  19. 19

    This runbook is focused on ensuring that AWS CloudTrail configurations across multiple regions comply with SOC2 standards. It involves comprehensive checks on CloudTrail trail configurations, including logging status, S3 bucket integrations, and CloudWatch Logs, ensuring global event capture and multi-region setup. It's essential for maintaining SOC2 compliance, emphasizing data security and integrity in cloud environments, and helps organizations manage their compliance posture efficiently.

    19
    1. 19.1

      This task involves enumerating and retrieving detailed information about every AWS CloudTrail trail that exists across all AWS regions within an AWS account. Each trail captures specific API activity and events, and having a comprehensive list helps in providing visibility into what actions are being logged, where the logs are stored, and how they are configured. This listing process is foundational for subsequent tasks like auditing, analysis, and optimization of AWS CloudTrail, aiding in efficient resource management and security compliance.

      import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Set region to None for all regions, or specify a valid AWS region string for a specific region #target_region = None target_region = target_region if target_region else None try: # List all available AWS regions ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') all_regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] except Exception as e: print(f"ERROR: An error occurred while listing AWS regions: {e}") all_regions = [] # Get trails for all regions or a specific region regions_to_check = all_regions if target_region is None else [target_region] all_trails = [] for region in regions_to_check: try: # List all trails in AWS CloudTrail for each region cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) response = cloudtrail_client.describe_trails(includeShadowTrails=False) trails_in_region = response['trailList'] if not trails_in_region: print(f"INFO: No trails found in region {region}.") else: all_trails.extend(trails_in_region) except Exception as e: # Handle exceptions thrown while listing trails for a region print(f"ERROR: An error occurred while listing trails in region {region}: {e}") # Print all trails if not all_trails: print("INFO: No trails found in all specified regions.") else: try: #print(all_trails) # for downstream task for trail in all_trails: print(f"Trail Name: {trail['Name']}, Trail ARN: {trail['TrailARN']}, Home Region: {trail['HomeRegion']}") except KeyError as ke: print(f"ERROR: Missing key {ke} in trail information: {trail}") except Exception as e: print(f"ERROR: An error occurred while printing trail information: {e}") print(f"SUMMARY: Processed {len(regions_to_check)} regions and found a total of {len(all_trails)} trails.")
      copied
      19.1
    2. 19.2

      This task verifies the existence and configuration of a Multi-Region AWS CloudTrail in compliance with SOC2 guidelines. It focuses on ensuring essential settings like logging, S3 and CloudWatch integrations, and global event coverage. This is crucial for upholding data security and integrity standards across an organization's AWS infrastructure.

      # Multi-Region CloudTrail Compliance Verification: SOC2 Guideline import boto3 from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_available_regions(service_name): """List all available regions for a given AWS service.""" ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] return regions def check_trails_in_region(region_name, s3_bucket_name, sns_topic_arn, cloudwatch_log_group_arn, include_management_events, read_write_type): """Check CloudTrail trails in a specific region and return details of compliant and non-compliant trails.""" non_compliant_trails = [] compliant_trail_details = None try: cloudtrail_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) trails = cloudtrail_client.describe_trails(includeShadowTrails=True)['trailList'] for trail in trails: if trail['IsMultiRegionTrail']: try: trail_config_response = cloudtrail_client.get_trail(Name=trail['TrailARN']) trail_config = trail_config_response.get('Trail', {}) trail_status_response = cloudtrail_client.get_trail_status(Name=trail['TrailARN']) is_logging = trail_status_response.get('IsLogging', False) except ClientError as e: print(f"Error in {region_name}: {e}") continue settings_match = ( is_logging and trail_config.get('S3BucketName') == s3_bucket_name and trail_config.get('SnsTopicARN') == sns_topic_arn and ('CloudWatchLogsLogGroupArn' not in trail_config or trail_config.get('CloudWatchLogsLogGroupArn') == cloudwatch_log_group_arn) and trail_config.get('IncludeGlobalServiceEvents') == include_management_events and trail_config.get('IsMultiRegionTrail', False) is True ) if settings_match: compliant_trail_details = { 'Region': region_name, 'Name': trail_config.get('Name'), 'S3BucketName': trail_config.get('S3BucketName'), 'SnsTopicARN': trail_config.get('SnsTopicARN'), 'CloudWatchLogsLogGroupArn': trail_config.get('CloudWatchLogsLogGroupArn'), 'IncludeManagementEvents': trail_config.get('IncludeGlobalServiceEvents'), 'IsMultiRegionTrail': trail_config.get('IsMultiRegionTrail') } return True, compliant_trail_details, non_compliant_trails else: non_compliant_trails.append(trail['Name']) return False, compliant_trail_details, non_compliant_trails except ClientError as e: print(f"AWS client error in region {region_name}: {e}") return False, compliant_trail_details, non_compliant_trails except Exception as e: print(f"An unexpected error occurred in region {region_name}: {e}") return False, compliant_trail_details, non_compliant_trails def check_cloudtrail_compliance(s3_bucket_name, sns_topic_arn, cloudwatch_log_group_arn, include_management_events, read_write_type): try: regions = list_available_regions('cloudtrail') compliant_in_any_region = False all_non_compliant_trails = {} compliant_trail_details = None for region in regions: compliant, details, non_compliant_trails = check_trails_in_region(region, s3_bucket_name, sns_topic_arn, cloudwatch_log_group_arn, include_management_events, read_write_type) all_non_compliant_trails[region] = non_compliant_trails if compliant: compliant_trail_details = details compliant_in_any_region = True break if compliant_in_any_region: print("Compliant Trail Found:") for key, value in compliant_trail_details.items(): print(f" {key}: {value}") else: print("Summary of Non-Compliant Trails by Region:") for region, trails in all_non_compliant_trails.items(): if trails: print(f" Region: {region}, Non-Compliant Trails: {', '.join(trails)}") else: print(f" Region: {region} has no non-compliant multi-region trails.") return compliant_in_any_region except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") return False except PartialCredentialsError: print("Incomplete AWS credentials. Please check your configuration.") return False except Exception as e: print(f"An unexpected error occurred: {e}") return False #s3_bucket_name='aws-cloudtrail-logs-355237452254-d5db7269' #sns_topic_arn='arn:aws:sns:ap-south-1:355237452254:aws-cloudtrail-logs-355237452254-0ac1f096' #cloudwatch_log_group_arn='arn:aws:logs:ap-south-1:355237452254:log-group:aws-cloudtrail-logs-355237452254-fc0d6f36:*' #include_management_events=True #read_write_type='ALL' #Type of events to record. Valid values are ReadOnly, WriteOnly and ALL. compliant = check_cloudtrail_compliance( s3_bucket_name, sns_topic_arn, cloudwatch_log_group_arn, include_management_events, read_write_type ) if compliant: print("\nAt least one compliant multi-region CloudTrail exists.") else: print("\nNo compliant multi-region CloudTrails found matching the specified criteria.") context.proceed = False
      copied
      19.2
  20. 20

    This runbook involves configuring an AWS CloudTrail Trail to log and monitor user activities, crucial for meeting SOC2 guidelines. By capturing detailed records of API calls and user actions within AWS, CloudTrail aids in continuous auditing and real-time security analysis.

    20
    1. 20.1

      This task involves retrieving and displaying a comprehensive list of all Amazon S3 buckets within an AWS account. This step is crucial as it provides a clear overview of all the storage resources available, serving as a starting point for various management and security tasks, such as enforcing encryption or implementing access policies. By generating a list of all S3 buckets, users can easily identify and manage their storage resources, ensuring effective organization and security compliance within their AWS environment.

      import boto3 from botocore.exceptions import BotoCoreError, NoCredentialsError, PartialCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_s3_buckets(): try: # Creating a Boto3 S3 client s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Sending a request to list S3 buckets response = s3.list_buckets() # Extracting bucket names from the response all_buckets = [bucket['Name'] for bucket in response['Buckets']] return all_buckets except NoCredentialsError: # Handle the exception when credentials are not found print("Error: AWS credentials not found") return None except PartialCredentialsError: # Handle the exception when provided credentials are incomplete print("Error: Incomplete AWS credentials") return None except BotoCoreError as e: # Handle other Boto3 core exceptions print(f"Error: AWS SDK for Python (Boto3) core error occurred - {e}") return None except Exception as e: # Handle any other general exceptions print(f"Unexpected error: {e}") return None # Main block buckets = list_all_s3_buckets() if buckets is not None: if buckets: print("Found the following S3 buckets:") for bucket in buckets: print(bucket) else: print("No S3 buckets found.") else: print("Error occurred while trying to list S3 buckets.") # Use Create S3 bucket task if you need to create a S3 bucket for CloudTrail logging context.skip_sub_tasks = True
      copied
      20.1
      1. 20.1.1

        This task involves setting up a unique data storage bucket in Amazon S3 for storing, managing, and retrieving data, with options for access control, versioning, and lifecycle management. S3 buckets provide a scalable and secure cloud storage solution.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS S3 client initialization s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Bucket name to create #bucket_name = 'my-logging-bucket-name' # Replace with your desired bucket name # Create S3 bucket try: s3_client.create_bucket(Bucket=bucket_name) print(f"Bucket {bucket_name} created successfully.") except ClientError as e: print(f"Error creating S3 bucket {bucket_name}: {e}")
        copied
        20.1.1
      2. 20.1.2

        This task involves modifying access controls and permissions of a S3 bucket to manage and secure data access, ensuring compliance with security standards and organizational requirements. This is essential for controlling and safeguarding sensitive information stored in S3. In this case the policy update is regarding write permissions for CloudTrail trail to write to S3 bucket.

        import boto3 from botocore.exceptions import ClientError import json creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] account_id = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key).get_caller_identity()['Account'] def update_s3_bucket_policy(bucket_name, policy): """ Update the policy of the specified S3 bucket. :param bucket_name: Name of the S3 bucket :param policy: Policy document as a JSON string """ try: s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Convert policy string to a JSON object and back to a string # This ensures the policy is properly formatted as a JSON string policy_json = json.loads(policy) formatted_policy = json.dumps(policy_json) # Updating the bucket policy s3_client.put_bucket_policy(Bucket=bucket_name, Policy=formatted_policy) print(f"Bucket policy updated successfully for {bucket_name}") except ClientError as e: print(f"Error updating policy for bucket {bucket_name}: {e}") except Exception as e: print(f"A general error occurred: {e}") # Replace with your bucket name #bucket_name = 'your-logging-bucket-name' # Define your new bucket policy here (ensure it's a valid JSON string) new_policy=''' { "Version": "2012-10-17", "Statement": [ { "Sid": "AWSCloudTrailAclCheck20150319", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": "s3:GetBucketAcl", "Resource": "arn:aws:s3:::{bucket_name}", "Condition": { "StringEquals": { "AWS:SourceArn": "arn:aws:cloudtrail:{region_name}:{account_id}:trail/{trail_name}" } } }, { "Sid": "AWSCloudTrailWrite20150319", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": "s3:PutObject", "Resource": "arn:aws:s3:::{bucket_name}/AWSLogs/{account_id}/*", "Condition": { "StringEquals": { "AWS:SourceArn": "arn:aws:cloudtrail:{region_name}:{account_id}:trail/{trail_name}", "s3:x-amz-acl": "bucket-owner-full-control" } } } ] } '''.format(bucket_name=bucket_name, region_name=region_name, trail_name=trail_name, account_id=account_id) update_s3_bucket_policy(bucket_name, new_policy) context.proceed = False
        copied
        20.1.2
      3. 20.1.3

        This task involves establishing a CloudTrail trail to monitor and record AWS account activities, and directing the log files to a specified S3 bucket for secure and centralized storage. This setup enables efficient auditing and analysis of AWS service usage and user activities.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS region configuration #region_name = 'us-east-1' # Replace with your desired AWS region # AWS CloudTrail client initialization with region ct_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) # Trail and S3 bucket configuration #trail_name = 'my-cloudtrail-trail' # Replace with your desired trail name #bucket_name = 'my-logging-bucket' # Replace with your S3 bucket name try: # Check if the trail already exists trails = ct_client.list_trails() if any(trail['Name'] == trail_name for trail in trails['Trails']): print(f"Trail {trail_name} already exists.") else: # Create the trail ct_client.create_trail(Name=trail_name, S3BucketName=bucket_name) # Start logging ct_client.start_logging(Name=trail_name) print(f"CloudTrail trail {trail_name} created and logging started to {s3_bucket_name}.") except ClientError as e: print(f"Error creating CloudTrail trail: {e}") except Exception as e: print(f"A general error occurred: {e}")
        copied
        20.1.3
  21. 21

    This runbook involves setting stringent password rules and enforcing them for all IAM users. Key measures include complex password requirements, regular password changes, and preventing password reuse. This effort aligns with SOC2 standards for robust data security and access management in cloud environments, enhancing the overall security posture and integrity of the system.

    21
    1. 21.1

      This task reviews the existing AWS IAM password policy to ensure it meets specified security standards. It involves assessing criteria like password complexity, expiration, and rotation rules for compliance with organizational or regulatory requirements.

      import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_password_policy(client): """ Retrieves the current account password policy. :param client: Boto3 IAM client :return: Current password policy if exists, None otherwise """ try: return client.get_account_password_policy()['PasswordPolicy'] except client.exceptions.NoSuchEntityException: # No password policy is set for the account print("No password policy is set for the account.") return None except ClientError as e: print(f"Error retrieving password policy: {e}") return None def check_password_policy_compliance(current_policy, desired_policy): """ Checks if the current AWS IAM account password policy is compliant with the desired policy. Also, returns the non-compliant fields. :param current_policy: The current password policy :param desired_policy: The desired password policy attributes :return: Tuple (True if compliant, False otherwise, non_compliant_fields) """ non_compliant_fields = {} for key, value in desired_policy.items(): if key not in current_policy or current_policy[key] != value: non_compliant_fields[key] = { 'current_value': current_policy.get(key), 'desired_value': value } return len(non_compliant_fields) == 0, non_compliant_fields # Desired password policy parameters desired_policy = { 'MinimumPasswordLength': int(MinimumPasswordLength), 'RequireSymbols': RequireSymbols, 'RequireNumbers': RequireNumbers, 'RequireUppercaseCharacters': RequireUppercaseCharacters, 'RequireLowercaseCharacters': RequireLowercaseCharacters, 'MaxPasswordAge': int(MaxPasswordAge), # Days 'PasswordReusePrevention': int(PasswordReusePrevention), 'HardExpiry': HardExpiry } ''' # The 'HardExpiry' field in the password policy determines whether IAM users are allowed to change their own passwords. # - If 'HardExpiry' is set to True, it means IAM users cannot change their own passwords. In this case, only an administrator can reset the passwords. This setting is typically used in highly secure environments where password management needs to be strictly controlled by administrators. # - If 'HardExpiry' is set to False, IAM users are permitted to change their own passwords. This setting is more user-friendly and allows users to manage their own password changes, including regular updates or resets if needed. ''' # Create a boto3 client for IAM iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Get the current account password policy current_policy = get_password_policy(iam_client) print("Current Policy:") for key, value in current_policy.items(): print(f" - {key}: {value}") if current_policy: # Debugging: Before checking compliance #print("Checking policy compliance...") is_compliant, non_compliant_fields = check_password_policy_compliance(current_policy, desired_policy) # Debugging: After checking compliance #print("Completed policy compliance check.") if is_compliant: print("The account password policy is compliant.") else: print("The account password policy is not compliant.") print("Non-compliant fields:") for field, values in non_compliant_fields.items(): print(f" - {field}: Current Value - {values['current_value']}, Desired Value - {values['desired_value']}") else: print("No password policy found for the account.") context.skip_sub_tasks=True #create_new_policy
      copied
      21.1
      1. 21.1.1

        This task configures rules for user passwords in your AWS account. This process includes defining requirements for password complexity, expiration, and rotation to enhance account security and manage access controls effectively.

        import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def set_password_policy(client): """ Updates the account password policy with specified settings. :param client: Boto3 IAM client """ try: # Update the account password policy with the desired settings client.update_account_password_policy( MinimumPasswordLength=MinimumPasswordLength, RequireSymbols=RequireSymbols, RequireNumbers=RequireNumbers, RequireUppercaseCharacters=RequireUppercaseCharacters, RequireLowercaseCharacters=RequireLowercaseCharacters, MaxPasswordAge=MaxPasswordAge, PasswordReusePrevention=PasswordReusePrevention, HardExpiry=HardExpiry ) print("Password policy updated successfully.") # Handle client errors from AWS except ClientError as e: print(f"AWS client error occurred: {e}") # Handle BotoCore errors except BotoCoreError as e: print(f"Boto core error occurred: {e}") # Handle other unexpected errors except Exception as e: print(f"An unexpected error occurred: {e}") # Create a boto3 client for IAM iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Set the password policy set_password_policy(iam_client)
        copied
        21.1.1
      2. 21.1.2

        This task makes all users to update their passwords by updating their login profiles, typically following the implementation of a new password policy. This ensures that all user passwords comply with the updated security standards, enhancing overall account security.

        import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def enforce_password_change_for_all_users(client): """ Enforce a password change for all IAM users. :param client: Boto3 IAM client :return: None """ try: paginator = client.get_paginator('list_users') for page in paginator.paginate(): for user in page['Users']: try: client.update_login_profile( UserName=user['UserName'], PasswordResetRequired=True ) print(f"Password change enforced for user: {user['UserName']}") except ClientError as e: if e.response['Error']['Code'] == 'NoSuchEntity': print(f"User {user['UserName']} does not have a password to change.") else: print(f"Failed to enforce password change for user {user['UserName']}: {e}") except Exception as e: print(f"Unexpected error for user {user['UserName']}: {e}") except ClientError as e: print(f"Error retrieving IAM users: {e}") except BotoCoreError as e: print(f"Boto core error: {e}") except Exception as e: print(f"Unexpected error: {e}") # Create a boto3 client for IAM iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Enforce password change for all users enforce_password_change_for_all_users(iam_client)
        copied
        21.1.2