AWS CloudTrail Log Validation Enabled Audit: SOC2 Compliance

This runbook ensures that CloudTrail, AWS's service for logging API activity, has log file validation enabled. This is crucial for SOC2 compliance, which demands secure and private handling of customer data. Enabling log file validation helps verify the integrity and authenticity of CloudTrail logs, demonstrating a commitment to robust information security practices.

target_region = None # Hardcoded for single execution result, user needs to set up the target_region when setting up log file validation
copied
  1. 1

    This task involves enumerating and retrieving detailed information about every AWS CloudTrail trail that exists across all AWS regions within an AWS account. Each trail captures specific API activity and events, and having a comprehensive list helps in providing visibility into what actions are being logged, where the logs are stored, and how they are configured. This listing process is foundational for subsequent tasks like auditing, analysis, and optimization of AWS CloudTrail, aiding in efficient resource management and security compliance.

    import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Set region to None for all regions, or specify a valid AWS region string for a specific region #target_region = None target_region = target_region if target_region else None try: # List all available AWS regions ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') all_regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] except Exception as e: print(f"ERROR: An error occurred while listing AWS regions: {e}") all_regions = [] # Get trails for all regions or a specific region regions_to_check = all_regions if target_region is None else [target_region] all_trails = [] for region in regions_to_check: try: # List all trails in AWS CloudTrail for each region cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) response = cloudtrail_client.describe_trails(includeShadowTrails=False) trails_in_region = response['trailList'] if not trails_in_region: print(f"INFO: No trails found in region {region}.") else: all_trails.extend(trails_in_region) except Exception as e: # Handle exceptions thrown while listing trails for a region print(f"ERROR: An error occurred while listing trails in region {region}: {e}") # Print all trails if not all_trails: print("INFO: No trails found in all specified regions.") else: try: #print(all_trails) # for downstream task for trail in all_trails: print(f"Trail Name: {trail['Name']}, Trail ARN: {trail['TrailARN']}, Home Region: {trail['HomeRegion']}") except KeyError as ke: print(f"ERROR: Missing key {ke} in trail information: {trail}") except Exception as e: print(f"ERROR: An error occurred while printing trail information: {e}") print(f"SUMMARY: Processed {len(regions_to_check)} regions and found a total of {len(all_trails)} trails.")
    copied
    1
  2. 2

    This task audits AWS CloudTrail Trails for SOC2 Compliance by checking Log File Validation across various regions. It evaluates each trail for enabled log file validation and the presence of a valid 'LatestDigestDeliveryTime'. Trails are marked as compliant or non-compliant based on these criteria, with specific reasons for non-compliance provided.

    import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_trail_status(region): try: cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) trails = cloudtrail_client.describe_trails(includeShadowTrails=True)['trailList'] if not trails: print(f"No CloudTrail trails found in region '{region}'.") return {} trail_statuses = {} for trail in trails: trail_name = trail['Name'] home_region = trail.get('HomeRegion') if home_region == region: try: trail_desc = cloudtrail_client.describe_trails(trailNameList=[trail_name]) trail_status = cloudtrail_client.get_trail_status(Name=trail_name) #print(trail_desc) # for debugging #print(trail_status) # for debugging log_file_validation_enabled = any(tr.get('LogFileValidationEnabled', False) for tr in trail_desc['trailList']) has_valid_digest_time = trail_status.get('LatestDigestDeliveryTime') is not None trail_data = { 'IsLogging': trail_status.get('IsLogging'), 'LatestDeliveryTime': trail_status.get('LatestDeliveryTime'), 'LatestDigestDeliveryTime': trail_status.get('LatestDigestDeliveryTime'), 'LogFileValidationEnabled': log_file_validation_enabled, 'HasValidDigestTime': has_valid_digest_time, 'ComplianceReason': determine_compliance_reason(log_file_validation_enabled, has_valid_digest_time) } trail_statuses[trail_name] = trail_data except ClientError as error: print(f"Error checking status for trail '{trail_name}' in region '{region}': {error}") return trail_statuses except (ClientError, BotoCoreError) as error: print(f"Error checking trails in region '{region}': {error}") return {} def determine_compliance_reason(log_file_validation_enabled, has_valid_digest_time): if not log_file_validation_enabled: return "Log file validation not enabled" if not has_valid_digest_time: return "No valid LatestDigestDeliveryTime" return "Compliant(Both Log File Validation and LatestDigestDeliveryTime are valid)" # Get All Available Regions regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] compliance_summary = {"Compliant": [], "Non-Compliant": []} for region in regions: print(f"\nChecking trails in region: {region}") region_status = check_trail_status(region) for trail, status in region_status.items(): compliance = "Compliant" if status['LogFileValidationEnabled'] and status['HasValidDigestTime'] else "Non-Compliant" reason = status['ComplianceReason'] print(f"Region: {region}, Trail: {trail}, Compliance: {compliance}, Reason: {reason}") compliance_summary[compliance].append(f"{region}:{trail}") # Output compliance summary print("\nCompliance Summary:") for status, trails in compliance_summary.items(): print(f"{status} Trails: {len(trails)}") for trail_info in trails: print(f" - {trail_info}") context.proceed=False # As downstream runbooks can't be included in the single execution result because of complex user based input parameters context.skip_sub_tasks=True
    copied
    2
    1. 2.1

      This task automates the enforcement and verification of log file validation for AWS CloudTrail trails. It checks if log file validation is enabled for a specified trail and activates it if necessary. Post-activation, it confirms the validation status, ensuring compliance with security best practices.

      import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def is_log_file_validation_enabled(trail_name, cloudtrail_client): """Check if log file validation is already enabled for the specified trail.""" try: response = cloudtrail_client.describe_trails(trailNameList=[trail_name]) for trail in response['trailList']: if trail['Name'] == trail_name: return trail.get('LogFileValidationEnabled', False) except ClientError as error: print(f"Error checking log file validation status for trail '{trail_name}': {error}") return False def enable_log_file_validation(trail_name, region): regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] if region not in regions: print(f"Invalid region: {region}") return try: cloudtrail_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) if is_log_file_validation_enabled(trail_name, cloudtrail_client): print(f"Log file validation is already enabled for trail '{trail_name}'.") return cloudtrail_client.update_trail( Name=trail_name, EnableLogFileValidation=True ) print(f"Log file validation enabled for trail '{trail_name}' in region '{region}'.") except ClientError as error: print(f"Error enabling log file validation for trail '{trail_name}': {error}") def verify_log_file_validation(trail_name, region): regions = [region['RegionName'] for region in boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] if region not in regions: print(f"Invalid region: {region}") return try: cloudtrail_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) response = cloudtrail_client.describe_trails(trailNameList=[trail_name]) if not response['trailList']: print(f"Trail '{trail_name}' not found in region '{region}'.") return for trail in response['trailList']: if trail.get('Name') == trail_name: print(f"Trail Name: {trail.get('Name')}") print(f"S3 Bucket Name: {trail.get('S3BucketName')}") print(f"Is Multi-Region Trail: {trail.get('IsMultiRegionTrail')}") print(f"Home Region: {trail.get('HomeRegion')}") print(f"Trail ARN: {trail.get('TrailARN')}") print(f"Log File Validation Enabled: {trail.get('LogFileValidationEnabled')}") return print(f"Trail '{trail_name}' not found in region '{region}'.") except ClientError as error: print(f"Error verifying log file validation for trail '{trail_name}': {error}") #trail_name = 'test-delete' # Replace with your trail name #region = 'us-east-1' # Replace with the region of your trail # Enable log file validation enable_log_file_validation(trail_name, region) # Verify log file validation verify_log_file_validation(trail_name, region)
      copied
      2.1