Sign in

Update the AWS KMS Key Policy to Allow CloudTrail to use the key

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

This task updates the AWS KMS key policy to authorize AWS CloudTrail to encrypt log files using the specified KMS key. The objective is to secure CloudTrail logs with KMS encryption, ensuring enhanced security and compliance. The process involves modifying the KMS key policy to include permissions for CloudTrail operations.

import boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_aws_account_id(): try: sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') account_id = sts_client.get_caller_identity()["Account"] return account_id except ClientError as e: print(f"An AWS client error occurred: {e}") return None except Exception as e: print(f"An unexpected error occurred: {e}") return None def update_kms_policy(kms_key_id): """ Updates the KMS key policy to allow CloudTrail to use the key. :param kms_key_id: The ID or ARN of the KMS key. """ account_id = get_aws_account_id() if not account_id: print("Unable to retrieve AWS account ID.") return kms_client = boto3.client('kms',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Retrieve the current key policy policy = kms_client.get_key_policy(KeyId=kms_key_id, PolicyName='default')['Policy'] policy_dict = json.loads(policy) # Append the new statement for CloudTrail cloudtrail_statement = { "Sid": "Allow CloudTrail to use the key", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": [ "kms:GenerateDataKey*", "kms:DescribeKey" ], "Resource": "*", "Condition": { "StringLike": { "kms:EncryptionContext:aws:cloudtrail:arn": f"arn:aws:cloudtrail:*:{account_id}:trail/*" } } } policy_dict['Statement'].append(cloudtrail_statement) # Update the key policy kms_client.put_key_policy( KeyId=kms_key_id, PolicyName='default', Policy=json.dumps(policy_dict) ) print(f"KMS key policy updated successfully for key: {kms_key_id}") except ClientError as e: print(f"Error updating KMS key policy: {e}") # Example usage #kms_key_id = '7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7' # Replace with your KMS key ID or ARN update_kms_policy(kms_key_id) context.proceed = False
copied