Verify Whether AWS CloudTrail is configured to use SSE AWS KMS

This task verifies if AWS CloudTrail is configured with Server-Side Encryption (SSE) using AWS Key Management Service (KMS) Customer Master Keys (CMKs). It ensures that each CloudTrail trail has a KmsKeyId defined, confirming encryption according to SOC2 standards. This process enhances security and meets regulatory requirements for encrypted AWS activity logging.

import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_trail_encryption(client, region_name): """ Checks CloudTrail trails in a specific region for KMS encryption and whether they are global or regional trails. :param client: Boto3 CloudTrail client :param region_name: Name of the AWS region :return: Tuple of total trails and compliant trails count """ try: trails = client.describe_trails(includeShadowTrails=False)['trailList'] if not trails: print(f"[{region_name}] No CloudTrail trails found.") return 0, 0 compliant_trails = 0 for trail in trails: trail_name = trail['Name'] trail_type = "Global" if trail.get('IsMultiRegionTrail', False) else "Regional" if 'KmsKeyId' in trail: print(f"[{region_name}] {trail_type} Trail '{trail_name}' is compliant with KMS CMK encryption.") compliant_trails += 1 else: print(f"[{region_name}] {trail_type} Trail '{trail_name}' is NOT compliant. KmsKeyId not defined.") print(f"[{region_name}] Summary: {compliant_trails} out of {len(trails)} {trail_type.lower()} trails are compliant with KMS CMK encryption.") return len(trails), compliant_trails except ClientError as e: print(f"AWS client error occurred in {region_name}: {e}") return 0, 0 except Exception as e: print(f"An unexpected error occurred in {region_name}: {e}") return 0, 0 def run_check(selected_region=None): """ Run the CloudTrail encryption check. :param selected_region: Specific region to check. If None, checks all regions. """ if selected_region: regions = [selected_region] else: # Use a default region only for fetching the list of regions default_region_for_fetching_regions = 'us-east-1' ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=default_region_for_fetching_regions) regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] total_compliant = 0 total_trails = 0 for region in regions: print(f"Checking CloudTrail trails in {region}...") cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) regional_trails, regional_compliant = check_trail_encryption(cloudtrail_client, region) total_trails += regional_trails total_compliant += regional_compliant print(f"Overall Summary: {total_compliant} out of {total_trails} total trails across all checked regions are compliant with KMS CMK encryption.") if region_name: # Example usage run_check(region_name) # Check all regions if region_name is None otherwise checks for a specific region passed in the input parameter # run_check('us-west-2') else: run_check() # Script running for all regions context.skip_sub_tasks=True # Remove this line if you want to choose or create a new KMS key to update the trail with
copied
  1. 1

    This task selects an existing AWS KMS Customer Master Key (CMK) or creates a new one if none exists. It checks for a CMK with a specific alias, creating a new key for encryption purposes as needed. This ensures enhanced security and compliance in AWS environments.

    import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_or_choose_kms_key(alias_name, region_name): """ Creates a new AWS KMS Customer Master Key (CMK) or returns an existing one based on the alias in the specified region. :param alias_name: Alias name for the KMS key. :param region_name: AWS region where the KMS key is to be created or found. :return: ARN of the KMS key. """ kms_client = boto3.client('kms', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Check if an alias exists for the given name aliases = kms_client.list_aliases() for alias in aliases['Aliases']: if alias['AliasName'] == 'alias/' + alias_name: print(f"Existing KMS key found for alias {alias_name} in {region_name}") return alias['TargetKeyId'] # If alias does not exist, create a new KMS CMK print(f"Creating a new KMS CMK for alias {alias_name} in {region_name}") key = kms_client.create_key(Description=f'KMS CMK for CloudTrail in {region_name}') kms_client.create_alias(AliasName='alias/' + alias_name, TargetKeyId=key['KeyMetadata']['KeyId']) return key['KeyMetadata']['Arn'] except ClientError as e: print(f"Error occurred while creating or retrieving KMS key in {region_name}: {e}") return None # Example usage #alias_name = 'my-cloudtrail-key-2' #region_name = 'us-east-1' # Replace with your desired AWS region kms_key_arn = create_or_choose_kms_key(alias_name, region_name) if kms_key_arn: print(f"KMS Key ARN in {region_name}: {kms_key_arn}") # Extracting the KMS Key ID from the ARN kms_key_id = kms_key_arn.split(':')[-1].split('/')[-1] # print(kms_key_id) # for debugging # Example Structure # kms_key_arn = "arn:aws:kms:us-east-1:355237452254:key/7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7" # kms_key = "7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7"
    copied
    1
  2. 2

    This task updates the AWS KMS key policy to authorize AWS CloudTrail to encrypt log files using the specified KMS key. The objective is to secure CloudTrail logs with KMS encryption, ensuring enhanced security and compliance. The process involves modifying the KMS key policy to include permissions for CloudTrail operations.

    import boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_aws_account_id(): try: sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') account_id = sts_client.get_caller_identity()["Account"] return account_id except ClientError as e: print(f"An AWS client error occurred: {e}") return None except Exception as e: print(f"An unexpected error occurred: {e}") return None def update_kms_policy(kms_key_id): """ Updates the KMS key policy to allow CloudTrail to use the key. :param kms_key_id: The ID or ARN of the KMS key. """ account_id = get_aws_account_id() if not account_id: print("Unable to retrieve AWS account ID.") return kms_client = boto3.client('kms',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Retrieve the current key policy policy = kms_client.get_key_policy(KeyId=kms_key_id, PolicyName='default')['Policy'] policy_dict = json.loads(policy) # Append the new statement for CloudTrail cloudtrail_statement = { "Sid": "Allow CloudTrail to use the key", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": [ "kms:GenerateDataKey*", "kms:DescribeKey" ], "Resource": "*", "Condition": { "StringLike": { "kms:EncryptionContext:aws:cloudtrail:arn": f"arn:aws:cloudtrail:*:{account_id}:trail/*" } } } policy_dict['Statement'].append(cloudtrail_statement) # Update the key policy kms_client.put_key_policy( KeyId=kms_key_id, PolicyName='default', Policy=json.dumps(policy_dict) ) print(f"KMS key policy updated successfully for key: {kms_key_id}") except ClientError as e: print(f"Error updating KMS key policy: {e}") # Example usage #kms_key_id = '7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7' # Replace with your KMS key ID or ARN update_kms_policy(kms_key_id) context.proceed = False
    copied
    2
  3. 3

    This task updates an AWS CloudTrail trail to use an AWS Key Management Service (KMS) Customer Master Key (CMK) for server-side encryption. It ensures that the trail's logs are encrypted with a specified KMS key, enhancing the security and confidentiality of audit log files. This update is vital for maintaining compliance and robust data protection standards in AWS.

    import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] trail_name = alias_name # Received from upstream tasks def update_trail_encryption(trail_name, kms_key_id, region_name): """ Updates a CloudTrail trail to use KMS encryption. :param trail_name: Name of the CloudTrail trail :param kms_key_id: The KMS key ARN or ID :param region_name: AWS region where the trail is located """ try: cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) cloudtrail_client.update_trail( Name=trail_name, KmsKeyId=kms_key_id ) print(f"Trail '{trail_name}' in {region_name} updated to use KMS CMK: {kms_key_id}") except ClientError as e: print(f"Error updating trail in {region_name}: {e}") # Example usage #trail_name = 'test-trail-1-east-1' # Replace with your trail name #kms_key_id = '28f9f7ce-41db-42fd-bfcf-be554ed408d3' # Replace with your KMS CMK ID or ARN #kms_key_id received from upstream task #region_name = 'us-east-1' # Replace with the region of your CloudTrail trail update_trail_encryption(trail_name, kms_key_id, region_name)
    copied
    3