K5SnkJHW2Er7prXMCpooUpdate the AWS KMS Key Policy to Allow CloudTrail to use the key
Update the AWS KMS Key Policy to Allow CloudTrail to use the key
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
This task updates the AWS KMS key policy to authorize AWS CloudTrail to encrypt log files using the specified KMS key. The objective is to secure CloudTrail logs with KMS encryption, ensuring enhanced security and compliance. The process involves modifying the KMS key policy to include permissions for CloudTrail operations.
inputs
outputs
import boto3
import json
from botocore.exceptions import ClientError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def get_aws_account_id():
try:
sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1')
account_id = sts_client.get_caller_identity()["Account"]
return account_id
except ClientError as e:
print(f"An AWS client error occurred: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred: {e}")
return None
def update_kms_policy(kms_key_id):
"""
Updates the KMS key policy to allow CloudTrail to use the key.
:param kms_key_id: The ID or ARN of the KMS key.
"""
account_id = get_aws_account_id()
if not account_id:
print("Unable to retrieve AWS account ID.")
return
kms_client = boto3.client('kms',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name)
try:
# Retrieve the current key policy
policy = kms_client.get_key_policy(KeyId=kms_key_id, PolicyName='default')['Policy']
policy_dict = json.loads(policy)
# Append the new statement for CloudTrail
cloudtrail_statement = {
"Sid": "Allow CloudTrail to use the key",
"Effect": "Allow",
"Principal": {
"Service": "cloudtrail.amazonaws.com"
},
"Action": [
"kms:GenerateDataKey*",
"kms:DescribeKey"
],
"Resource": "*",
"Condition": {
"StringLike": {
"kms:EncryptionContext:aws:cloudtrail:arn": f"arn:aws:cloudtrail:*:{account_id}:trail/*"
}
}
}
policy_dict['Statement'].append(cloudtrail_statement)
# Update the key policy
kms_client.put_key_policy(
KeyId=kms_key_id,
PolicyName='default',
Policy=json.dumps(policy_dict)
)
print(f"KMS key policy updated successfully for key: {kms_key_id}")
except ClientError as e:
print(f"Error updating KMS key policy: {e}")
# Example usage
#kms_key_id = '7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7' # Replace with your KMS key ID or ARN
update_kms_policy(kms_key_id)
context.proceed = False
copied