Verify Whether AWS CloudTrail is configured to use SSE AWS KMS
This task verifies if AWS CloudTrail is configured with Server-Side Encryption (SSE) using AWS Key Management Service (KMS) Customer Master Keys (CMKs). It ensures that each CloudTrail trail has a KmsKeyId defined, confirming encryption according to SOC2 standards. This process enhances security and meets regulatory requirements for encrypted AWS activity logging.
- 1wNxRKhr9anuka2NjUxf2Choose or Create an AWS KMS CMK
1
Choose or Create an AWS KMS CMK
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task selects an existing AWS KMS Customer Master Key (CMK) or creates a new one if none exists. It checks for a CMK with a specific alias, creating a new key for encryption purposes as needed. This ensures enhanced security and compliance in AWS environments.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_or_choose_kms_key(alias_name, region_name): """ Creates a new AWS KMS Customer Master Key (CMK) or returns an existing one based on the alias in the specified region. :param alias_name: Alias name for the KMS key. :param region_name: AWS region where the KMS key is to be created or found. :return: ARN of the KMS key. """ kms_client = boto3.client('kms', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Check if an alias exists for the given name aliases = kms_client.list_aliases() for alias in aliases['Aliases']: if alias['AliasName'] == 'alias/' + alias_name: print(f"Existing KMS key found for alias {alias_name} in {region_name}") return alias['TargetKeyId'] # If alias does not exist, create a new KMS CMK print(f"Creating a new KMS CMK for alias {alias_name} in {region_name}") key = kms_client.create_key(Description=f'KMS CMK for CloudTrail in {region_name}') kms_client.create_alias(AliasName='alias/' + alias_name, TargetKeyId=key['KeyMetadata']['KeyId']) return key['KeyMetadata']['Arn'] except ClientError as e: print(f"Error occurred while creating or retrieving KMS key in {region_name}: {e}") return None # Example usage #alias_name = 'my-cloudtrail-key-2' #region_name = 'us-east-1' # Replace with your desired AWS region kms_key_arn = create_or_choose_kms_key(alias_name, region_name) if kms_key_arn: print(f"KMS Key ARN in {region_name}: {kms_key_arn}") # Extracting the KMS Key ID from the ARN kms_key_id = kms_key_arn.split(':')[-1].split('/')[-1] # print(kms_key_id) # for debugging # Example Structure # kms_key_arn = "arn:aws:kms:us-east-1:355237452254:key/7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7" # kms_key = "7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7"copied1 - 2K5SnkJHW2Er7prXMCpooUpdate the AWS KMS Key Policy to Allow CloudTrail to use the key
2
Update the AWS KMS Key Policy to Allow CloudTrail to use the key
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task updates the AWS KMS key policy to authorize AWS CloudTrail to encrypt log files using the specified KMS key. The objective is to secure CloudTrail logs with KMS encryption, ensuring enhanced security and compliance. The process involves modifying the KMS key policy to include permissions for CloudTrail operations.
inputsoutputsimport boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_aws_account_id(): try: sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') account_id = sts_client.get_caller_identity()["Account"] return account_id except ClientError as e: print(f"An AWS client error occurred: {e}") return None except Exception as e: print(f"An unexpected error occurred: {e}") return None def update_kms_policy(kms_key_id): """ Updates the KMS key policy to allow CloudTrail to use the key. :param kms_key_id: The ID or ARN of the KMS key. """ account_id = get_aws_account_id() if not account_id: print("Unable to retrieve AWS account ID.") return kms_client = boto3.client('kms',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Retrieve the current key policy policy = kms_client.get_key_policy(KeyId=kms_key_id, PolicyName='default')['Policy'] policy_dict = json.loads(policy) # Append the new statement for CloudTrail cloudtrail_statement = { "Sid": "Allow CloudTrail to use the key", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": [ "kms:GenerateDataKey*", "kms:DescribeKey" ], "Resource": "*", "Condition": { "StringLike": { "kms:EncryptionContext:aws:cloudtrail:arn": f"arn:aws:cloudtrail:*:{account_id}:trail/*" } } } policy_dict['Statement'].append(cloudtrail_statement) # Update the key policy kms_client.put_key_policy( KeyId=kms_key_id, PolicyName='default', Policy=json.dumps(policy_dict) ) print(f"KMS key policy updated successfully for key: {kms_key_id}") except ClientError as e: print(f"Error updating KMS key policy: {e}") # Example usage #kms_key_id = '7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7' # Replace with your KMS key ID or ARN update_kms_policy(kms_key_id) context.proceed = Falsecopied2 - 3bCDyuaAahEmOGB4SJEVPUpdate AWS CloudTrail Trail with AWS KMS CMK
3
Update AWS CloudTrail Trail with AWS KMS CMK
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task updates an AWS CloudTrail trail to use an AWS Key Management Service (KMS) Customer Master Key (CMK) for server-side encryption. It ensures that the trail's logs are encrypted with a specified KMS key, enhancing the security and confidentiality of audit log files. This update is vital for maintaining compliance and robust data protection standards in AWS.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] trail_name = alias_name # Received from upstream tasks def update_trail_encryption(trail_name, kms_key_id, region_name): """ Updates a CloudTrail trail to use KMS encryption. :param trail_name: Name of the CloudTrail trail :param kms_key_id: The KMS key ARN or ID :param region_name: AWS region where the trail is located """ try: cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) cloudtrail_client.update_trail( Name=trail_name, KmsKeyId=kms_key_id ) print(f"Trail '{trail_name}' in {region_name} updated to use KMS CMK: {kms_key_id}") except ClientError as e: print(f"Error updating trail in {region_name}: {e}") # Example usage #trail_name = 'test-trail-1-east-1' # Replace with your trail name #kms_key_id = '28f9f7ce-41db-42fd-bfcf-be554ed408d3' # Replace with your KMS CMK ID or ARN #kms_key_id received from upstream task #region_name = 'us-east-1' # Replace with the region of your CloudTrail trail update_trail_encryption(trail_name, kms_key_id, region_name)copied3