Check which AWS CloudTrail Trails have Log File Validation enabled: SOC2 Complaince

This task audits AWS CloudTrail Trails for SOC2 Compliance by checking Log File Validation across various regions. It evaluates each trail for enabled log file validation and the presence of a valid 'LatestDigestDeliveryTime'. Trails are marked as compliant or non-compliant based on these criteria, with specific reasons for non-compliance provided.

import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_trail_status(region): try: cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) trails = cloudtrail_client.describe_trails(includeShadowTrails=True)['trailList'] if not trails: print(f"No CloudTrail trails found in region '{region}'.") return {} trail_statuses = {} for trail in trails: trail_name = trail['Name'] home_region = trail.get('HomeRegion') if home_region == region: try: trail_desc = cloudtrail_client.describe_trails(trailNameList=[trail_name]) trail_status = cloudtrail_client.get_trail_status(Name=trail_name) #print(trail_desc) # for debugging #print(trail_status) # for debugging log_file_validation_enabled = any(tr.get('LogFileValidationEnabled', False) for tr in trail_desc['trailList']) has_valid_digest_time = trail_status.get('LatestDigestDeliveryTime') is not None trail_data = { 'IsLogging': trail_status.get('IsLogging'), 'LatestDeliveryTime': trail_status.get('LatestDeliveryTime'), 'LatestDigestDeliveryTime': trail_status.get('LatestDigestDeliveryTime'), 'LogFileValidationEnabled': log_file_validation_enabled, 'HasValidDigestTime': has_valid_digest_time, 'ComplianceReason': determine_compliance_reason(log_file_validation_enabled, has_valid_digest_time) } trail_statuses[trail_name] = trail_data except ClientError as error: print(f"Error checking status for trail '{trail_name}' in region '{region}': {error}") return trail_statuses except (ClientError, BotoCoreError) as error: print(f"Error checking trails in region '{region}': {error}") return {} def determine_compliance_reason(log_file_validation_enabled, has_valid_digest_time): if not log_file_validation_enabled: return "Log file validation not enabled" if not has_valid_digest_time: return "No valid LatestDigestDeliveryTime" return "Compliant(Both Log File Validation and LatestDigestDeliveryTime are valid)" # Get All Available Regions regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] compliance_summary = {"Compliant": [], "Non-Compliant": []} for region in regions: print(f"\nChecking trails in region: {region}") region_status = check_trail_status(region) for trail, status in region_status.items(): compliance = "Compliant" if status['LogFileValidationEnabled'] and status['HasValidDigestTime'] else "Non-Compliant" reason = status['ComplianceReason'] print(f"Region: {region}, Trail: {trail}, Compliance: {compliance}, Reason: {reason}") compliance_summary[compliance].append(f"{region}:{trail}") # Output compliance summary print("\nCompliance Summary:") for status, trails in compliance_summary.items(): print(f"{status} Trails: {len(trails)}") for trail_info in trails: print(f" - {trail_info}") context.proceed=False # As downstream runbooks can't be included in the single execution result because of complex user based input parameters context.skip_sub_tasks=True
copied
  1. 1

    This task automates the enforcement and verification of log file validation for AWS CloudTrail trails. It checks if log file validation is enabled for a specified trail and activates it if necessary. Post-activation, it confirms the validation status, ensuring compliance with security best practices.

    import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def is_log_file_validation_enabled(trail_name, cloudtrail_client): """Check if log file validation is already enabled for the specified trail.""" try: response = cloudtrail_client.describe_trails(trailNameList=[trail_name]) for trail in response['trailList']: if trail['Name'] == trail_name: return trail.get('LogFileValidationEnabled', False) except ClientError as error: print(f"Error checking log file validation status for trail '{trail_name}': {error}") return False def enable_log_file_validation(trail_name, region): regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] if region not in regions: print(f"Invalid region: {region}") return try: cloudtrail_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) if is_log_file_validation_enabled(trail_name, cloudtrail_client): print(f"Log file validation is already enabled for trail '{trail_name}'.") return cloudtrail_client.update_trail( Name=trail_name, EnableLogFileValidation=True ) print(f"Log file validation enabled for trail '{trail_name}' in region '{region}'.") except ClientError as error: print(f"Error enabling log file validation for trail '{trail_name}': {error}") def verify_log_file_validation(trail_name, region): regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] if region not in regions: print(f"Invalid region: {region}") return try: cloudtrail_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) response = cloudtrail_client.describe_trails(trailNameList=[trail_name]) if not response['trailList']: print(f"Trail '{trail_name}' not found in region '{region}'.") return for trail in response['trailList']: if trail.get('Name') == trail_name: print(f"Trail Name: {trail.get('Name')}") print(f"S3 Bucket Name: {trail.get('S3BucketName')}") print(f"Is Multi-Region Trail: {trail.get('IsMultiRegionTrail')}") print(f"Home Region: {trail.get('HomeRegion')}") print(f"Trail ARN: {trail.get('TrailARN')}") print(f"Log File Validation Enabled: {trail.get('LogFileValidationEnabled')}") return print(f"Trail '{trail_name}' not found in region '{region}'.") except ClientError as error: print(f"Error verifying log file validation for trail '{trail_name}': {error}") #trail_name = 'test-delete' # Replace with your trail name #region = 'us-east-1' # Replace with the region of your trail # Enable log file validation enable_log_file_validation(trail_name, region) # Verify log file validation verify_log_file_validation(trail_name, region)
    copied
    1