CloudWatch and Logging
- 1Dawb5IdhwlUvff4cbZZ8Delete unused AWS CloudWatch Log Streams
1
Delete unused AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook automates the process of identifying and deleting unused Amazon CloudWatch Log Streams. By scanning specified log groups across designated AWS regions, it efficiently detects log streams that have been inactive for a predetermined period. Once identified, these log streams are safely removed, helping organizations maintain a clutter-free logging environment and potentially reducing associated storage costs.
inputsoutputs1- 1.1h2ozXNRKuV8VsHandh8wList all AWS CloudWatch Log Streams
1.1
List all AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task is designed to systematically retrieve and enumerate all CloudWatch log streams present in specified AWS regions. It offers a detailed snapshot of the existing log streams, enabling users to understand their logging landscape across various AWS services and applications.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def fetch_log_streams(client, region): log_streams_info = [] #print(f"\nFetching log streams for region: {region}...") log_groups = client.describe_log_groups() if not log_groups.get('logGroups'): #print(f"No log groups or streams found in region: {region}.") return log_streams_info for log_group in log_groups['logGroups']: log_group_name = log_group['logGroupName'] log_streams = client.describe_log_streams(logGroupName=log_group_name) for stream in log_streams.get('logStreams', []): #print(f"Region: {region}, Log Group: {log_group_name}, Log Stream: {stream['logStreamName']}") # Append the information to log_streams_info log_streams_info.append({ 'region': region, 'log_group': log_group_name, 'log_stream': stream['logStreamName'] }) return log_streams_info def list_all_log_streams(region=None): log_streams_info = [] # To store log streams information for all regions try: # Create an initial client to fetch regions if no specific region is provided ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') if region: regions = [region] else: regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] for specific_region in regions: client = boto3.client('logs', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=specific_region) region_log_streams = fetch_log_streams(client, specific_region) log_streams_info.extend(region_log_streams) except boto3.exceptions.Boto3Error as e: print(f"An error occurred while accessing AWS: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return log_streams_info def display_log_streams(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "AWS Log Streams Data" table.num_cols = 3 # Number of columns for Region, Log Group, and Log Stream table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names headers = ["Region", "Log Group", "Log Stream"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the log stream data by region for better readability data.sort(key=lambda x: x["region"]) # Populate the table with log stream data for row_num, entry in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each log stream entry values = [entry["region"], entry["log_group"], entry["log_stream"]] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main block target_region = None # hardcoded for one time result log_streams_data = list_all_log_streams(target_region) # Pass the name of region as a string to search for that specific region otherwise it runs for all regions #print("\nCompleted fetching log streams data.") # Uncomment the line below if you want to see the returned data structure # print(log_streams_data) display_log_streams(log_streams_data)copied1.1 - 1.2OK6np9mI65cVmADIdvR6Filter Unused AWS CloudWatch Log Streams
1.2
Filter Unused AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task examines CloudWatch log streams to identify those that have been inactive for a specified duration. By pinpointing these dormant streams, the task aids in maintaining a cleaner, more efficient logging environment and can subsequently assist in reducing unnecessary storage costs associated with retaining outdated logs on AWS CloudWatch.
inputsoutputsimport boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def filter_unused_log_streams(all_log_streams, unused_days=30): unused_log_streams = [] for log_info in all_log_streams: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: log_stream = client.describe_log_streams( logGroupName=log_info['log_group'], logStreamNamePrefix=log_info['log_stream'] )['logStreams'][0] # We're using prefix, so getting the first result # Check if the log stream has a 'lastEventTimestamp' if 'lastEventTimestamp' in log_stream: last_event_date = datetime.utcfromtimestamp(log_stream['lastEventTimestamp'] / 1000) if last_event_date < datetime.utcnow() - timedelta(days=unused_days): unused_log_streams.append(log_info) except boto3.exceptions.Boto3Error as e: print(f"Error accessing log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}") except Exception as e: print(f"Unexpected error: {e}") return unused_log_streams def display_log_streams(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "Unused Log Streams Data" table.num_cols = 3 # Number of columns for Region, Log Group, and Log Stream table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names headers = ["Region", "Log Group", "Log Stream"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the log stream data by region for better readability data.sort(key=lambda x: x["region"]) # Populate the table with log stream data for row_num, entry in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each log stream entry values = [entry["region"], entry["log_group"], entry["log_stream"]] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main block # UNUSED_DAYS = 90 # all_log_streams to be passed down from parent task # Example structure, all_log_streams = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity all_log_streams = log_streams_data # Passed down from parent task unused_logs = filter_unused_log_streams(all_log_streams, UNUSED_DAYS) if unused_logs: display_log_streams(unused_logs) '''print("\nFiltered unused log streams:") for log in unused_logs: print(f"Region: {log['region']}, Log Group: {log['log_group']}, Log Stream: {log['log_stream']}")''' # Uncomment the line below if you want to see the full list of unused log streams # print(unused_logs) else: print("No Unused Logs") context.skip_sub_tasks=Truecopied1.2- 1.2.1r9VELqGVT9MERUkvcBZSDelete AWS CloudWatch Log Streams
1.2.1
Delete AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task allows users to remove specified log streams from AWS CloudWatch. By executing this task, organizations can effectively manage and declutter their logging space, ensuring that only relevant and necessary logs are retained. This not only optimizes the logging environment but also helps in potentially reducing storage-associated costs on AWS.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_log_streams(unused_logs): """ Deletes the specified CloudWatch log streams. Args: unused_logs (list): List of dictionaries containing region, log group, and unused log stream information. Returns: list: List of dictionaries with the results of the deletion process. """ deletion_results = [] for log_info in unused_logs: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: # Delete the log stream client.delete_log_stream( logGroupName=log_info['log_group'], logStreamName=log_info['log_stream'] ) deletion_results.append({ 'status': 'success', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Successfully deleted log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}." }) except boto3.exceptions.Boto3Error as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Error deleting log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}" }) except Exception as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Unexpected error: {e}" }) return deletion_results # Main Block # unused_logs to be passed down from parent task # Example Structure, unused_logs = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity results = delete_log_streams(unused_logs) if not results: print("No log streams were deleted.") else: for result in results: print(result['message'])copied1.2.1
- 2m0OvpINJJN1MymwoEgFKEnable S3 Logging to log session activity of SSM sessions
2
Enable S3 Logging to log session activity of SSM sessions
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook facilitates the capture and storage of SSM session activity of instances directly into an S3 bucket. By integrating S3 logging, every command executed and its respective output during SSM sessions are systematically logged. This not only enhances security and auditability but also provides a comprehensive record of operations performed in SSM sessions, ensuring transparency and traceability in system management tasks.
inputsoutputs2- 2.1ToTFXHMd3aUIgqYq8JD4Prerequisites of using a SSM session.
2.1
Prerequisites of using a SSM session.
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Before utilizing AWS Systems Manager (SSM) sessions, certain prerequisites must be met. These include ensuring the target EC2 instances have the SSM Agent installed and are associated with an appropriate IAM role granting session permissions. Additionally, it's crucial to verify that the necessary AWS services, such as Amazon EC2 and Systems Manager, are accessible and adequately configured. Meeting these prerequisites ensures seamless initiation and management of sessions via SSM, enabling secure and efficient remote instance management.
inputsoutputs2.1- 2.1.1tQpRJ0KNuxSdiAL0vPXuVerify the target EC2 instances have the SSM agent installed and running
2.1.1
Verify the target EC2 instances have the SSM agent installed and running
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Ensure the SSM Agent which is a software component that enables instances to interact with AWS Systems Manager services is installed on the target EC2 instances by checking whether the amazon-ssm-agent is running or not.
inputsoutputssudo systemctl status snap.amazon-ssm-agent.amazon-ssm-agent.servicecopied2.1.1 - 2.1.2liRAsceuaVuC1pPZ80xrVerify the IAM role has the correct permissions
2.1.2
Verify the IAM role has the correct permissions
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.For an EC2 instance to interact with AWS Systems Manager and establish an SSM session, it must be associated with an IAM role that has the correct Systems Manager permissions which include AmazonSSMManagedInstanceCore policy and any other policy required by the services such as S3FullAccess policy to enable S3 logging. By ensuring the IAM role is correctly configured with the appropriate permissions, you enable seamless integration between the EC2 instance and Systems Manager, facilitating tasks such as patch management, state management, and remote session operations.
inputsoutputs2.1.2
- 2.2iASI6LDMzG8eLR4ZFqnXEnable S3 logging by updating SSM file
2.2
Enable S3 logging by updating SSM file
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputs2.2- 2.2.1VzwJMy5EBU0bOhfJlApqStore the content of SSM-SessionManagerRunShell.json file
2.2.1
Store the content of SSM-SessionManagerRunShell.json file
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputsecho '{ "schemaVersion": "1.0", "description": "Document to hold regional settings for Session Manager", "sessionType": "Standard_Stream", "inputs": { "s3BucketName": <bucket_name>, "s3KeyPrefix": "", "s3EncryptionEnabled": true, "cloudWatchLogGroupName": "", "cloudWatchEncryptionEnabled": true, "cloudWatchStreamingEnabled": false, "kmsKeyId": "", "runAsEnabled": false, "runAsDefaultUser": "", "idleSessionTimeout": "", "maxSessionDuration": "", "shellProfile": { "windows": "date", "linux": "pwd;ls" } } }' > SessionManagerRunShell.jsoncopied2.2.1 - 2.2.2wgke3hzrlGB339IS9YZSUpdate the SSM document to enable S3 logging
2.2.2
Update the SSM document to enable S3 logging
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputsfile_content=$(cat SessionManagerRunShell.json) aws ssm update-document --name "SSM-SessionManagerRunShell" --content "$file_content" --document-version "\$LATEST"copied2.2.2
- 3c12BXOgedOVODPuwNb04Analysing AWS CloudTrail Trails
3
Analysing AWS CloudTrail Trails
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook involves a two-step process aimed at optimizing AWS CloudTrail management. Initially, every trail within AWS CloudTrail is meticulously enumerated and listed, offering visibility into trail configurations across all available regions. The process then shifts focus to scrutinize each trail, identifying and highlighting any redundant trails that may exist. Redundant trails often result in unnecessary costs and complexities, and identifying them is crucial for efficient AWS resource management. The analysis is comprehensive, covering global trails, organizational trails, and multiple trails within the same region, ensuring that AWS CloudTrail is streamlined and cost-effective.
inputsoutputs3- 3.1jtE72IY5WRDEnevpEUHqList all AWS CloudTrail Trails
3.1
List all AWS CloudTrail Trails
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves enumerating and retrieving detailed information about every AWS CloudTrail trail that exists across all AWS regions within an AWS account. Each trail captures specific API activity and events, and having a comprehensive list helps in providing visibility into what actions are being logged, where the logs are stored, and how they are configured. This listing process is foundational for subsequent tasks like auditing, analysis, and optimization of AWS CloudTrail, aiding in efficient resource management and security compliance.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Set region to None for all regions, or specify a valid AWS region string for a specific region #target_region = None target_region = target_region if target_region else None try: # List all available AWS regions ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') all_regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] except Exception as e: print(f"ERROR: An error occurred while listing AWS regions: {e}") all_regions = [] # Get trails for all regions or a specific region regions_to_check = all_regions if target_region is None else [target_region] all_trails = [] for region in regions_to_check: try: # List all trails in AWS CloudTrail for each region cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) response = cloudtrail_client.describe_trails(includeShadowTrails=False) trails_in_region = response['trailList'] if not trails_in_region: print(f"INFO: No trails found in region {region}.") else: all_trails.extend(trails_in_region) except Exception as e: # Handle exceptions thrown while listing trails for a region print(f"ERROR: An error occurred while listing trails in region {region}: {e}") # Print all trails if not all_trails: print("INFO: No trails found in all specified regions.") else: try: #print(all_trails) # for downstream task for trail in all_trails: print(f"Trail Name: {trail['Name']}, Trail ARN: {trail['TrailARN']}, Home Region: {trail['HomeRegion']}") except KeyError as ke: print(f"ERROR: Missing key {ke} in trail information: {trail}") except Exception as e: print(f"ERROR: An error occurred while printing trail information: {e}") print(f"SUMMARY: Processed {len(regions_to_check)} regions and found a total of {len(all_trails)} trails.")copied3.1 - 3.2zzhPQeNCMz07Hq3nAQh9Filter out redundant AWS CloudTrail Trails
3.2
Filter out redundant AWS CloudTrail Trails
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.AWS CloudTrail trails are designed to log and monitor AWS account activity, but having multiple trails with overlapping configurations or that are recording the same events in the same region can lead to confusion, inefficiency, and increased costs. This task systematically reviews each trail, checks for redundancy based on specific criteria like region, event types logged, and destination S3 bucket, and then flags the redundant trails for review or deletion. Streamlining your trails through this method enhances manageability, reduces costs, and improves the clarity of your audit logs.
inputsoutputs3.2- 3.2.1NVnfrVOzW6Q13Y7hUa0cFilter out redundant global AWS CloudTrail Trails
3.2.1
Filter out redundant global AWS CloudTrail Trails
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task aims to identify and filter out redundant global trails within AWS CloudTrail. A global trail is a trail that applies to all regions in an AWS account. Redundant global trails can capture duplicate events, leading to unnecessary data storage and processing costs. Our script carefully inspects all global trails in each AWS region and identifies redundancies, providing a clear report of any trails that are unnecessary or duplicative. This allows for streamlined management and potential cost savings by helping administrators easily spot and remove any redundant global trails.
inputsoutputsimport boto3 # Replace the following line with the actual list of trails you have #all_trails = [{'Name': 'ctrail_123', 'S3BucketName': 'aws-cloudtrail-logs-355237452254-0d3050fa', 'IncludeGlobalServiceEvents': True, 'IsMultiRegionTrail': True, 'HomeRegion': 'us-east-1', 'TrailARN': 'arn:aws:cloudtrail:us-east-1:355237452254:trail/ctrail_123', 'LogFileValidationEnabled': True, 'HasCustomEventSelectors': True, 'HasInsightSelectors': True, 'IsOrganizationTrail': False}, {'Name': 'c_global', 'S3BucketName': 'aws-cloudtrail-logs-355237452254-0d3050fa', 'IncludeGlobalServiceEvents': True, 'IsMultiRegionTrail': True, 'HomeRegion': 'us-west-2', 'TrailARN': 'arn:aws:cloudtrail:us-west-2:355237452254:trail/c_global', 'LogFileValidationEnabled': False, 'HasCustomEventSelectors': True, 'HasInsightSelectors': False, 'IsOrganizationTrail': False}, {'Name': 'ctrail_oregon', 'S3BucketName': 'aws-cloudtrail-logs-355237452254-0d3050fa', 'IncludeGlobalServiceEvents': False, 'IsMultiRegionTrail': False, 'HomeRegion': 'us-west-2', 'TrailARN': 'arn:aws:cloudtrail:us-west-2:355237452254:trail/ctrail_oregon', 'LogFileValidationEnabled': True, 'HasCustomEventSelectors': True, 'HasInsightSelectors': True, 'IsOrganizationTrail': False}] if all_trails: try: # Filtering global trails that are in their home region global_trails = [trail for trail in all_trails if trail['IsMultiRegionTrail'] and trail['HomeRegion'] == trail['HomeRegion']] # Grouping global trails per account account_trail_map = {} for trail in global_trails: account_id = trail['TrailARN'].split(':')[4] account_trail_map.setdefault(account_id, []).append(trail) # Identifying and printing redundant global trails redundant_trails_found = False for account_id, trails in account_trail_map.items(): if len(trails) > 1: redundant_trails_found = True print(f"Alarm: Account {account_id} has {len(trails)} global trails which is redundant.") for i, trail in enumerate(trails): redundant_to = ', '.join([t['Name'] for idx, t in enumerate(trails) if idx != i]) print(f" - Resource: {trail['TrailARN']}, Reason: {trail['Name']} is redundant to {redundant_to}, Region: {trail['HomeRegion']}") if not redundant_trails_found: print("No redundant global trails found.") except Exception as e: # Log any general exception that occurs print(f"An unexpected error occurred: {e}") else: print("No trails were provided.")copied3.2.1 - 3.2.2IPgiiFGPctOYk8Zy2dudFilter out redundant regional AWS CloudTrail Trails
3.2.2
Filter out redundant regional AWS CloudTrail Trails
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task detects redundant regional trails within AWS CloudTrail. Occasionally, users might inadvertently create multiple trails in the same region, which not only results in redundant data collection but also incurs additional costs. These unnecessary trails can capture identical events and consume extra storage, leading to inefficiency and clutter. This task scans through all the regional trails in each AWS region and pinpoints the redundant ones. By identifying redundant trails, the script aids administrators in optimizing their AWS CloudTrail configuration, thereby promoting efficient resource utilization and cost-effectiveness.
inputsoutputsimport boto3 #all_trails = [{'Name': 'ctrail_123', 'S3BucketName': 'aws-cloudtrail-logs-355237452254-0d3050fa', 'IncludeGlobalServiceEvents': True, 'IsMultiRegionTrail': True, 'HomeRegion': 'us-east-1', 'TrailARN': 'arn:aws:cloudtrail:us-east-1:355237452254:trail/ctrail_123', 'LogFileValidationEnabled': True, 'HasCustomEventSelectors': True, 'HasInsightSelectors': True, 'IsOrganizationTrail': False}, {'Name': 'c_global', 'S3BucketName': 'aws-cloudtrail-logs-355237452254-0d3050fa', 'IncludeGlobalServiceEvents': True, 'IsMultiRegionTrail': True, 'HomeRegion': 'us-west-2', 'TrailARN': 'arn:aws:cloudtrail:us-west-2:355237452254:trail/c_global', 'LogFileValidationEnabled': False, 'HasCustomEventSelectors': True, 'HasInsightSelectors': False, 'IsOrganizationTrail': False}, {'Name': 'ctrail_oregon', 'S3BucketName': 'aws-cloudtrail-logs-355237452254-0d3050fa', 'IncludeGlobalServiceEvents': False, 'IsMultiRegionTrail': False, 'HomeRegion': 'us-west-2', 'TrailARN': 'arn:aws:cloudtrail:us-west-2:355237452254:trail/ctrail_oregon', 'LogFileValidationEnabled': True, 'HasCustomEventSelectors': True, 'HasInsightSelectors': True, 'IsOrganizationTrail': False}] if all_trails: # Identifying global and organization trails global_trails = [trail for trail in all_trails if trail['IsMultiRegionTrail']] org_trails = [trail for trail in all_trails if trail.get('IsOrganizationTrail', False)] print(f"INFO: Identified {len(global_trails)} global trails and {len(org_trails)} organization trails") # Counting regional trails per region regional_trails_count = {} for trail in all_trails: if not trail['IsMultiRegionTrail'] and not trail.get('IsOrganizationTrail', False): regional_trails_count[trail['HomeRegion']] = regional_trails_count.get(trail['HomeRegion'], 0) + 1 print(f"INFO: Count of regional trails per region: {regional_trails_count}") # Identifying and printing redundant regional trails redundant_trails_found = False for trail in all_trails: try: if not trail['IsMultiRegionTrail'] and not trail.get('IsOrganizationTrail', False): status = 'alarm' if (len(global_trails) > 0 or len(org_trails) > 0 or regional_trails_count[trail['HomeRegion']] > 1) else 'ok' redundant_trails_found = True if status == 'alarm' else redundant_trails_found reason = f"{trail['Name']} is redundant to: " if len(global_trails) > 0: reason += f"Global Trails: {', '.join([gt['Name'] for gt in global_trails])} " if len(org_trails) > 0: reason += f"Organization Trails: {', '.join([ot['Name'] for ot in org_trails])} " if regional_trails_count[trail['HomeRegion']] > 1: reason += f"other {regional_trails_count[trail['HomeRegion']]-1} regional trails in {trail['HomeRegion']}." print(f"Resource: {trail['TrailARN']}, Status: {status}, Reason: {reason}, Region: {trail['HomeRegion']}, Account ID: {trail['TrailARN'].split(':')[4]}") except Exception as e: print(f"ERROR: An error occurred while processing trail {trail['Name']}: {e}") if not redundant_trails_found: print("INFO: No redundant trails found") else: print("No trails were provided.")copied3.2.2
- 4uC6cfnr9xHPADaTfvAs1Enable AWS CloudTrail Logging for Logging and Monitoring User Activity
4
Enable AWS CloudTrail Logging for Logging and Monitoring User Activity
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook involves configuring an AWS CloudTrail Trail to log and monitor user activities, crucial for meeting SOC2 guidelines. By capturing detailed records of API calls and user actions within AWS, CloudTrail aids in continuous auditing and real-time security analysis.
inputsoutputs4- 4.1I4Jg58AgFTnrLoNniBs9Create an AWS S3 bucket
4.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves setting up a unique data storage bucket in Amazon S3 for storing, managing, and retrieving data, with options for access control, versioning, and lifecycle management. S3 buckets provide a scalable and secure cloud storage solution.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS S3 client initialization s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Bucket name to create #bucket_name = 'my-logging-bucket-name' # Replace with your desired bucket name # Create S3 bucket try: s3_client.create_bucket(Bucket=bucket_name) print(f"Bucket {bucket_name} created successfully.") except ClientError as e: print(f"Error creating S3 bucket {bucket_name}: {e}")copied4.1 - 4.2T5l4631JDkDI29kYWDHNUpdate the bucket policy of an AWS S3 bucket
4.2
Update the bucket policy of an AWS S3 bucket
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves modifying access controls and permissions of a S3 bucket to manage and secure data access, ensuring compliance with security standards and organizational requirements. This is essential for controlling and safeguarding sensitive information stored in S3. In this case the policy update is regarding write permissions for CloudTrail trail to write to S3 bucket.
inputsoutputsimport boto3 from botocore.exceptions import ClientError import json creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] account_id = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key).get_caller_identity()['Account'] def update_s3_bucket_policy(bucket_name, policy): """ Update the policy of the specified S3 bucket. :param bucket_name: Name of the S3 bucket :param policy: Policy document as a JSON string """ try: s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Convert policy string to a JSON object and back to a string # This ensures the policy is properly formatted as a JSON string policy_json = json.loads(policy) formatted_policy = json.dumps(policy_json) # Updating the bucket policy s3_client.put_bucket_policy(Bucket=bucket_name, Policy=formatted_policy) print(f"Bucket policy updated successfully for {bucket_name}") except ClientError as e: print(f"Error updating policy for bucket {bucket_name}: {e}") except Exception as e: print(f"A general error occurred: {e}") # Replace with your bucket name #bucket_name = 'your-logging-bucket-name' # Define your new bucket policy here (ensure it's a valid JSON string) new_policy=''' { "Version": "2012-10-17", "Statement": [ { "Sid": "AWSCloudTrailAclCheck20150319", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": "s3:GetBucketAcl", "Resource": "arn:aws:s3:::{bucket_name}", "Condition": { "StringEquals": { "AWS:SourceArn": "arn:aws:cloudtrail:{region_name}:{account_id}:trail/{trail_name}" } } }, { "Sid": "AWSCloudTrailWrite20150319", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": "s3:PutObject", "Resource": "arn:aws:s3:::{bucket_name}/AWSLogs/{account_id}/*", "Condition": { "StringEquals": { "AWS:SourceArn": "arn:aws:cloudtrail:{region_name}:{account_id}:trail/{trail_name}", "s3:x-amz-acl": "bucket-owner-full-control" } } } ] } '''.format(bucket_name=bucket_name, region_name=region_name, trail_name=trail_name, account_id=account_id) update_s3_bucket_policy(bucket_name, new_policy) context.proceed = Falsecopied4.2 - 4.3Tk4D72j4lmsIgEnGt5iTCreate an AWS CloudTrail trail and configuring it to an S3 bucket
4.3
Create an AWS CloudTrail trail and configuring it to an S3 bucket
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves establishing a CloudTrail trail to monitor and record AWS account activities, and directing the log files to a specified S3 bucket for secure and centralized storage. This setup enables efficient auditing and analysis of AWS service usage and user activities.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS region configuration #region_name = 'us-east-1' # Replace with your desired AWS region # AWS CloudTrail client initialization with region ct_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) # Trail and S3 bucket configuration #trail_name = 'my-cloudtrail-trail' # Replace with your desired trail name #bucket_name = 'my-logging-bucket' # Replace with your S3 bucket name try: # Check if the trail already exists trails = ct_client.list_trails() if any(trail['Name'] == trail_name for trail in trails['Trails']): print(f"Trail {trail_name} already exists.") else: # Create the trail ct_client.create_trail(Name=trail_name, S3BucketName=bucket_name) # Start logging ct_client.start_logging(Name=trail_name) print(f"CloudTrail trail {trail_name} created and logging started to {s3_bucket_name}.") except ClientError as e: print(f"Error creating CloudTrail trail: {e}") except Exception as e: print(f"A general error occurred: {e}")copied4.3
- 5z2EwKopujyDmdYWg5xuNEnable VPC Flow Logs in AWS
5
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook involves turning on a feature for capturing information about IP traffic going to and from networks interfaces in a Virtual Private Cloud (VPC). This data is vital for network monitoring, security analysis, and troubleshooting. The logs can be stored in Amazon CloudWatch Logs or Amazon S3 for detailed analysis and archival purposes, aiding in compliance and operational auditing.
inputsoutputsregion_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.copied5- 5.1NyeDrRgo1w7ndZEQJZYaList All VPCs in AWS
5.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task enumerates all Virtual Private Clouds across every AWS region in an account. This task is essential for network management, security audits, and resource tracking, especially in large-scale environments. It provides details like VPC IDs, CIDR blocks, and associated resources for each VPC.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_vpcs_in_region(region_name): vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) ec2 = session.client('ec2') response = ec2.describe_vpcs() vpcs = response.get('Vpcs', []) if vpcs: print(f"In region '{region_name}', found the following VPCs:") for vpc in vpcs: vpc_id = vpc['VpcId'] vpcs_info.append({'Region': region_name, 'VPC_ID': vpc_id}) print(f" VPC ID: {vpc_id}") else: print(f"No VPCs found in region '{region_name}'.") except ClientError as e: print(f"An error occurred in region {region_name}: {e}") except Exception as e: print(f"An unexpected error occurred in region {region_name}: {e}") return vpcs_info def list_vpcs_all_regions(): all_vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') ec2 = session.client('ec2') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] for region in regions: vpcs_info = list_vpcs_in_region(region) all_vpcs_info.extend(vpcs_info) except ClientError as e: print(f"An error occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return all_vpcs_info #region_name = None # Replace with a region name or leave as None for all regions if region_name: vpcs = list_vpcs_in_region(region_name) else: vpcs = list_vpcs_all_regions() #print(vpcs) # Summary of all VPCs across regions if vpcs: print("\nSummary of all VPCs across regions:") for vpc in vpcs: print(f"Region: {vpc['Region']}, VPC ID: {vpc['VPC_ID']}") else: print("No VPCs found in any of the regions.")copied5.1 - 5.2C7h7EgxU8gkBcLlRkY9oFilter Out VPCs with Flow Logs not enabled in AWS
5.2
Filter Out VPCs with Flow Logs not enabled in AWS
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task identifies Virtual Private Clouds (VPCs) in an AWS environment that lack active Flow Logs. This task is essential for security and compliance, ensuring that network traffic is monitored and logged. It involves checking each VPC's Flow Logs status and isolating those without the feature, helping to prioritize security enhancements and network monitoring strategies.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_vpc_flow_logs(vpc_id, region): try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Check for flow logs response = ec2.describe_flow_logs(Filters=[{'Name': 'resource-id', 'Values': [vpc_id]}]) if response['FlowLogs']: print(f"Flow Logs are enabled for VPC {vpc_id} in region {region}.") return True else: print(f"Flow Logs are not enabled for VPC {vpc_id} in region {region}.") return False except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") return False # Example VPCs list #vpcs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}, {'Region': 'eu-north-1', 'VPC_ID': 'vpc-0db5fbfe0a4263ef5'}] # Checking flow logs for each VPC and collecting VPCs without flow logs vpcs_without_flow_logs = [] for vpc in vpcs: if not check_vpc_flow_logs(vpc['VPC_ID'], vpc['Region']): vpcs_without_flow_logs.append(vpc) #print(vpcs_without_flow_logs) #for debugging context.skip_sub_tasks=True # Example vpcs_without_flow_logs passed to the downstream task #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}, {'Region': 'eu-north-1', 'VPC_ID': 'vpc-0db5fbfe0a4263ef5'}]copied5.2- 5.2.1I4Jg58AgFTnrLoNniBs9Create an AWS S3 bucket
5.2.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves setting up a unique data storage bucket in Amazon S3 for storing, managing, and retrieving data, with options for access control, versioning, and lifecycle management. S3 buckets provide a scalable and secure cloud storage solution.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS S3 client initialization s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Bucket name to create #bucket_name = 'my-logging-bucket-name' # Replace with your desired bucket name # Create S3 bucket try: s3_client.create_bucket(Bucket=bucket_name) print(f"Bucket {bucket_name} created successfully.") except ClientError as e: print(f"Error creating S3 bucket {bucket_name}: {e}")copied5.2.1 - 5.2.2jLl4PEsrWvzRveVfhAQuUpdate AWS S3 bucket policy for VPC Flow Logs
5.2.2
Update AWS S3 bucket policy for VPC Flow Logs
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputsimport boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_bucket_policy(bucket_name, account_number, regions): """ Create a bucket policy for the specified bucket, account number, and regions. """ policy_statements = [ { "Sid": "AWSLogDeliveryAclCheck", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:GetBucketAcl", "Resource": f"arn:aws:s3:::{bucket_name}", "Condition": {"StringEquals": {"aws:SourceAccount": account_number}} } ] resource = f"arn:aws:s3:::{bucket_name}/AWSLogs/{account_number}/*" for region in regions: source_arn = f"arn:aws:logs:{region}:{account_number}:*" policy_statements.append( { "Sid": f"AWSLogDeliveryWrite_{region}", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:PutObject", "Resource": resource, "Condition": { "StringEquals": { "aws:SourceAccount": account_number, "s3:x-amz-acl": "bucket-owner-full-control" }, "ArnLike": {"aws:SourceArn": source_arn} } } ) policy = { "Version": "2012-10-17", "Id": "AWSLogDeliveryWrite20150319", "Statement": policy_statements } return policy def update_s3_bucket_policy(s3_client, bucket_name, policy): """ Update the S3 bucket policy. """ try: s3_client.put_bucket_policy( Bucket=bucket_name, Policy=json.dumps(policy) ) print(f"Bucket policy updated for {bucket_name}.") except ClientError as e: print(f"Error updating bucket policy: {e}") account_number = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key).get_caller_identity()['Account'] #bucket_name = 'your-bucket-name' # Replace with your S3 bucket name #regions_for_bucket_policy = ['us-east-1', 'ap-south-1'] # List of regions # This part will be used if the user has the same logging bucket for multiple regions for VPC Flow Logs # Create S3 client s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Create and update the bucket policy policy = create_bucket_policy(bucket_name, account_number, regions_for_bucket_policy) update_s3_bucket_policy(s3_client, bucket_name, policy) s3_bucket_arn = f"arn:aws:s3:::{bucket_name}" #passed to downstream taskcopied5.2.2 - 5.2.3dTFEJj9CIWX6Q1fHHXTdConfigure VPC Flow Logs in AWS to an S3 bucket
5.2.3
Configure VPC Flow Logs in AWS to an S3 bucket
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task activates a logging feature for Virtual Private Clouds (VPCs) in AWS. This feature records and stores information about the network traffic flowing through the VPC, aiding in security monitoring, traffic analysis, and troubleshooting. The collected data can be sent to Amazon CloudWatch Logs or Amazon S3 for retention and analysis.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def enable_vpc_flow_logs(vpc_id, region, s3_bucket_arn): """ Enable VPC Flow Logs for the specified VPC, directing them to an S3 bucket. """ try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Create the flow log response = ec2.create_flow_logs( ResourceIds=[vpc_id], ResourceType='VPC', TrafficType='ALL', LogDestinationType='s3', LogDestination=s3_bucket_arn ) print(response) if response['Unsuccessful']: print(f"Failed to enable Flow Logs for VPC {vpc_id} in region {region}.") else: print(f"Successfully enabled Flow Logs for VPC {vpc_id} in region {region}.") except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") # List of VPCs without flow logs #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}] # S3 bucket ARN for storing flow logs #s3_bucket_arn = 'arn:aws:s3:::your-bucket-name' # Replace with your S3 bucket ARN # Enabling flow logs for each VPC for vpc in vpcs_without_flow_logs: enable_vpc_flow_logs(vpc['VPC_ID'], vpc['Region'], s3_bucket_arn)copied5.2.3
- 6WWVGacdUb7NrQMTsHDsBEnd-to-End Encryption Setup for AWS CloudTrail: SOC2 Compliance
6
End-to-End Encryption Setup for AWS CloudTrail: SOC2 Compliance
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook provides a detailed guide for verifying and/or setting up end-to-end encryption in AWS CloudTrail for SOC2 compliance. It covers configuring CloudTrail with AWS KMS Customer Master Keys (CMKs) for Server-Side Encryption (SSE), including steps for creating or selecting KMS CMKs and ensuring secure encryption of CloudTrail trails.
inputsoutputsregion_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.copied6- 6.1fAuAaVSIKtWaSftRDQJ1Verify Whether AWS CloudTrail is configured to use SSE AWS KMS
6.1
Verify Whether AWS CloudTrail is configured to use SSE AWS KMS
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task verifies if AWS CloudTrail is configured with Server-Side Encryption (SSE) using AWS Key Management Service (KMS) Customer Master Keys (CMKs). It ensures that each CloudTrail trail has a KmsKeyId defined, confirming encryption according to SOC2 standards. This process enhances security and meets regulatory requirements for encrypted AWS activity logging.
inputsoutputsimport boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_trail_encryption(client, region_name): """ Checks CloudTrail trails in a specific region for KMS encryption and whether they are global or regional trails. :param client: Boto3 CloudTrail client :param region_name: Name of the AWS region :return: Tuple of total trails and compliant trails count """ try: trails = client.describe_trails(includeShadowTrails=False)['trailList'] if not trails: print(f"[{region_name}] No CloudTrail trails found.") return 0, 0 compliant_trails = 0 for trail in trails: trail_name = trail['Name'] trail_type = "Global" if trail.get('IsMultiRegionTrail', False) else "Regional" if 'KmsKeyId' in trail: print(f"[{region_name}] {trail_type} Trail '{trail_name}' is compliant with KMS CMK encryption.") compliant_trails += 1 else: print(f"[{region_name}] {trail_type} Trail '{trail_name}' is NOT compliant. KmsKeyId not defined.") print(f"[{region_name}] Summary: {compliant_trails} out of {len(trails)} {trail_type.lower()} trails are compliant with KMS CMK encryption.") return len(trails), compliant_trails except ClientError as e: print(f"AWS client error occurred in {region_name}: {e}") return 0, 0 except Exception as e: print(f"An unexpected error occurred in {region_name}: {e}") return 0, 0 def run_check(selected_region=None): """ Run the CloudTrail encryption check. :param selected_region: Specific region to check. If None, checks all regions. """ if selected_region: regions = [selected_region] else: # Use a default region only for fetching the list of regions default_region_for_fetching_regions = 'us-east-1' ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=default_region_for_fetching_regions) regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] total_compliant = 0 total_trails = 0 for region in regions: print(f"Checking CloudTrail trails in {region}...") cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) regional_trails, regional_compliant = check_trail_encryption(cloudtrail_client, region) total_trails += regional_trails total_compliant += regional_compliant print(f"Overall Summary: {total_compliant} out of {total_trails} total trails across all checked regions are compliant with KMS CMK encryption.") if region_name: # Example usage run_check(region_name) # Check all regions if region_name is None otherwise checks for a specific region passed in the input parameter # run_check('us-west-2') else: run_check() # Script running for all regions context.skip_sub_tasks=True # Remove this line if you want to choose or create a new KMS key to update the trail withcopied6.1- 6.1.1wNxRKhr9anuka2NjUxf2Choose or Create an AWS KMS CMK
6.1.1
Choose or Create an AWS KMS CMK
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task selects an existing AWS KMS Customer Master Key (CMK) or creates a new one if none exists. It checks for a CMK with a specific alias, creating a new key for encryption purposes as needed. This ensures enhanced security and compliance in AWS environments.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_or_choose_kms_key(alias_name, region_name): """ Creates a new AWS KMS Customer Master Key (CMK) or returns an existing one based on the alias in the specified region. :param alias_name: Alias name for the KMS key. :param region_name: AWS region where the KMS key is to be created or found. :return: ARN of the KMS key. """ kms_client = boto3.client('kms', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Check if an alias exists for the given name aliases = kms_client.list_aliases() for alias in aliases['Aliases']: if alias['AliasName'] == 'alias/' + alias_name: print(f"Existing KMS key found for alias {alias_name} in {region_name}") return alias['TargetKeyId'] # If alias does not exist, create a new KMS CMK print(f"Creating a new KMS CMK for alias {alias_name} in {region_name}") key = kms_client.create_key(Description=f'KMS CMK for CloudTrail in {region_name}') kms_client.create_alias(AliasName='alias/' + alias_name, TargetKeyId=key['KeyMetadata']['KeyId']) return key['KeyMetadata']['Arn'] except ClientError as e: print(f"Error occurred while creating or retrieving KMS key in {region_name}: {e}") return None # Example usage #alias_name = 'my-cloudtrail-key-2' #region_name = 'us-east-1' # Replace with your desired AWS region kms_key_arn = create_or_choose_kms_key(alias_name, region_name) if kms_key_arn: print(f"KMS Key ARN in {region_name}: {kms_key_arn}") # Extracting the KMS Key ID from the ARN kms_key_id = kms_key_arn.split(':')[-1].split('/')[-1] # print(kms_key_id) # for debugging # Example Structure # kms_key_arn = "arn:aws:kms:us-east-1:355237452254:key/7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7" # kms_key = "7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7"copied6.1.1 - 6.1.2K5SnkJHW2Er7prXMCpooUpdate the AWS KMS Key Policy to Allow CloudTrail to use the key
6.1.2
Update the AWS KMS Key Policy to Allow CloudTrail to use the key
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task updates the AWS KMS key policy to authorize AWS CloudTrail to encrypt log files using the specified KMS key. The objective is to secure CloudTrail logs with KMS encryption, ensuring enhanced security and compliance. The process involves modifying the KMS key policy to include permissions for CloudTrail operations.
inputsoutputsimport boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_aws_account_id(): try: sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') account_id = sts_client.get_caller_identity()["Account"] return account_id except ClientError as e: print(f"An AWS client error occurred: {e}") return None except Exception as e: print(f"An unexpected error occurred: {e}") return None def update_kms_policy(kms_key_id): """ Updates the KMS key policy to allow CloudTrail to use the key. :param kms_key_id: The ID or ARN of the KMS key. """ account_id = get_aws_account_id() if not account_id: print("Unable to retrieve AWS account ID.") return kms_client = boto3.client('kms',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: # Retrieve the current key policy policy = kms_client.get_key_policy(KeyId=kms_key_id, PolicyName='default')['Policy'] policy_dict = json.loads(policy) # Append the new statement for CloudTrail cloudtrail_statement = { "Sid": "Allow CloudTrail to use the key", "Effect": "Allow", "Principal": { "Service": "cloudtrail.amazonaws.com" }, "Action": [ "kms:GenerateDataKey*", "kms:DescribeKey" ], "Resource": "*", "Condition": { "StringLike": { "kms:EncryptionContext:aws:cloudtrail:arn": f"arn:aws:cloudtrail:*:{account_id}:trail/*" } } } policy_dict['Statement'].append(cloudtrail_statement) # Update the key policy kms_client.put_key_policy( KeyId=kms_key_id, PolicyName='default', Policy=json.dumps(policy_dict) ) print(f"KMS key policy updated successfully for key: {kms_key_id}") except ClientError as e: print(f"Error updating KMS key policy: {e}") # Example usage #kms_key_id = '7e38fb56-e600-4130-bf5a-b8fbc8bd2cf7' # Replace with your KMS key ID or ARN update_kms_policy(kms_key_id) context.proceed = Falsecopied6.1.2 - 6.1.3bCDyuaAahEmOGB4SJEVPUpdate AWS CloudTrail Trail with AWS KMS CMK
6.1.3
Update AWS CloudTrail Trail with AWS KMS CMK
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task updates an AWS CloudTrail trail to use an AWS Key Management Service (KMS) Customer Master Key (CMK) for server-side encryption. It ensures that the trail's logs are encrypted with a specified KMS key, enhancing the security and confidentiality of audit log files. This update is vital for maintaining compliance and robust data protection standards in AWS.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] trail_name = alias_name # Received from upstream tasks def update_trail_encryption(trail_name, kms_key_id, region_name): """ Updates a CloudTrail trail to use KMS encryption. :param trail_name: Name of the CloudTrail trail :param kms_key_id: The KMS key ARN or ID :param region_name: AWS region where the trail is located """ try: cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) cloudtrail_client.update_trail( Name=trail_name, KmsKeyId=kms_key_id ) print(f"Trail '{trail_name}' in {region_name} updated to use KMS CMK: {kms_key_id}") except ClientError as e: print(f"Error updating trail in {region_name}: {e}") # Example usage #trail_name = 'test-trail-1-east-1' # Replace with your trail name #kms_key_id = '28f9f7ce-41db-42fd-bfcf-be554ed408d3' # Replace with your KMS CMK ID or ARN #kms_key_id received from upstream task #region_name = 'us-east-1' # Replace with the region of your CloudTrail trail update_trail_encryption(trail_name, kms_key_id, region_name)copied6.1.3
- 7UcYPYVlaxMeURpokx8TZDefault Security Group Audit and Remediation in AWS VPCs: SOC2 Compliance
7
Default Security Group Audit and Remediation in AWS VPCs: SOC2 Compliance
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook conducts a thorough audit of default security groups in all AWS VPCs, ensuring they disallow any inbound or outbound traffic. It identifies and automatically remediates non-compliant groups to enforce stringent network security standards. The process enhances overall VPC security by adhering to a strict no-traffic policy in default security groups.
inputsoutputsregion_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.copied7- 7.1NyeDrRgo1w7ndZEQJZYaList All VPCs in AWS
7.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task enumerates all Virtual Private Clouds across every AWS region in an account. This task is essential for network management, security audits, and resource tracking, especially in large-scale environments. It provides details like VPC IDs, CIDR blocks, and associated resources for each VPC.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_vpcs_in_region(region_name): vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) ec2 = session.client('ec2') response = ec2.describe_vpcs() vpcs = response.get('Vpcs', []) if vpcs: print(f"In region '{region_name}', found the following VPCs:") for vpc in vpcs: vpc_id = vpc['VpcId'] vpcs_info.append({'Region': region_name, 'VPC_ID': vpc_id}) print(f" VPC ID: {vpc_id}") else: print(f"No VPCs found in region '{region_name}'.") except ClientError as e: print(f"An error occurred in region {region_name}: {e}") except Exception as e: print(f"An unexpected error occurred in region {region_name}: {e}") return vpcs_info def list_vpcs_all_regions(): all_vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') ec2 = session.client('ec2') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] for region in regions: vpcs_info = list_vpcs_in_region(region) all_vpcs_info.extend(vpcs_info) except ClientError as e: print(f"An error occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return all_vpcs_info #region_name = None # Replace with a region name or leave as None for all regions if region_name: vpcs = list_vpcs_in_region(region_name) else: vpcs = list_vpcs_all_regions() #print(vpcs) # Summary of all VPCs across regions if vpcs: print("\nSummary of all VPCs across regions:") for vpc in vpcs: print(f"Region: {vpc['Region']}, VPC ID: {vpc['VPC_ID']}") else: print("No VPCs found in any of the regions.")copied7.1 - 7.2ySql1lvPAYue9jDX1mTdDetecting and Marking Non-Compliant VPC Security Groups: SOC2 Compliance
7.2
Detecting and Marking Non-Compliant VPC Security Groups: SOC2 Compliance
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task focuses on scrutinizing default security groups in AWS VPCs to identify and flag those allowing unauthorized traffic. It serves as a critical measure to pinpoint security groups that deviate from the no-traffic policy, ensuring adherence to stringent network security protocols in VPC environments.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_default_security_groups(region_name=None): non_compliant_sgs = [] compliant_sgs = [] if region_name: print(f"Checking default security groups in specified region: {region_name}") regions_to_check = [region_name] else: print("Checking default security groups in all AWS regions...") regions_to_check = [region['RegionName'] for region in boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1').describe_regions()['Regions']] for region in regions_to_check: try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) vpcs = ec2_client.describe_vpcs()['Vpcs'] if not vpcs: print(f"No VPCs found in region {region}.") continue for vpc in vpcs: sg_response = ec2_client.describe_security_groups( Filters=[{'Name': 'vpc-id', 'Values': [vpc['VpcId']]}, {'Name': 'group-name', 'Values': ['default']}] ) for sg in sg_response['SecurityGroups']: inbound_rules = sg['IpPermissions'] outbound_rules = sg['IpPermissionsEgress'] if inbound_rules or outbound_rules: non_compliant_sgs.append({ 'Region': region, 'VpcId': vpc['VpcId'], 'SecurityGroupId': sg['GroupId'] }) else: compliant_sgs.append({ 'Region': region, 'VpcId': vpc['VpcId'], 'SecurityGroupId': sg['GroupId'] }) except ClientError as e: print(f"An AWS client error occurred in region {region}: {e}") return non_compliant_sgs, compliant_sgs # Example usage #region_name = None # Use None for all regions, or specify a region like 'us-west-2' non_compliant_security_groups, compliant_security_groups = check_default_security_groups(region_name) if non_compliant_security_groups: print("\nNon-compliant default security groups found:") for sg_info in non_compliant_security_groups: print(f"Region: {sg_info['Region']}, VPC ID: {sg_info['VpcId']}, Security Group ID: {sg_info['SecurityGroupId']} is NON_COMPLIANT") if compliant_security_groups: print("\nCompliant default security groups found:") for sg_info in compliant_security_groups: print(f"Region: {sg_info['Region']}, VPC ID: {sg_info['VpcId']}, Security Group ID: {sg_info['SecurityGroupId']} is COMPLIANT") if not non_compliant_security_groups and not compliant_security_groups: print("\nNo VPCs with default security groups were found.") context.skip_sub_tasks=Truecopied7.2- 7.2.1Cn09UVVGIqVK8dHcQafjImplementing No-Traffic Policy in VPC Default Security Groups
7.2.1
Implementing No-Traffic Policy in VPC Default Security Groups
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves configuring the default security groups within AWS VPCs to strictly enforce a no-traffic policy. It entails systematically updating the security group rules to block all inbound and outbound traffic, ensuring compliance with stringent network security protocols.
inputsoutputsimport boto3 from botocore.exceptions import ClientError, BotoCoreError, NoCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def remediate_default_sg_of_vpc(region_name, vpc_id): """ Removes all inbound and outbound rules from the default security group of a specified VPC. Parameters: region_name (str): AWS region of the VPC. vpc_id (str): ID of the VPC whose default security group needs to be remediated. Returns: None """ if not region_name or not vpc_id: print("Error: 'region_name' and 'vpc_id' must be provided.") return try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) sg_response = ec2_client.describe_security_groups( Filters=[{'Name': 'vpc-id', 'Values': [vpc_id]}, {'Name': 'group-name', 'Values': ['default']}] ) #print(sg_response) # for debugging if sg_response['SecurityGroups']: sg_id = sg_response['SecurityGroups'][0]['GroupId'] #print(sg_id) # for debugging # Remove all inbound and outbound rules try: # Retrieve existing rules current_sg = ec2_client.describe_security_groups(GroupIds=[sg_id])['SecurityGroups'][0] inbound_rules = current_sg.get('IpPermissions', []) outbound_rules = current_sg.get('IpPermissionsEgress', []) # Remove inbound rules if inbound_rules: ec2_client.revoke_security_group_ingress(GroupId=sg_id, IpPermissions=inbound_rules) print(f"Removed all inbound rules from default security group {sg_id} in VPC {vpc_id}.") # Remove outbound rules if outbound_rules: ec2_client.revoke_security_group_egress(GroupId=sg_id, IpPermissions=outbound_rules) print(f"Removed all outbound rules from default security group {sg_id} in VPC {vpc_id}.") # Verification step updated_sg = ec2_client.describe_security_groups(GroupIds=[sg_id])['SecurityGroups'][0] if not updated_sg.get('IpPermissions') and not updated_sg.get('IpPermissionsEgress'): print(f"Successfully removed all rules from security group {sg_id}.") else: print(f"Rules may not have been completely removed from security group {sg_id}.") except ClientError as e: print(f"Error modifying security group {sg_id}: {e}") else: print(f"No default security group found for VPC {vpc_id}.") except NoCredentialsError: print("Error: No AWS credentials found. Please configure your credentials.") except BotoCoreError as e: print(f"BotoCore Error: {e}") except ClientError as e: print(f"Client Error: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") # Example usage #region_name = 'us-west-2' # Specify the AWS region #vpc_id = 'vpc-0e42a95f21ed25d5c' # Replace with your VPC ID remediate_default_sg_of_vpc(region_name, vpc_id)copied7.2.1
- 8sy8XjuwmeOJHaOTB5cjqMulti-Region AWS CloudTrail Compliance Verification: SOC2 Compliance
8
Multi-Region AWS CloudTrail Compliance Verification: SOC2 Compliance
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook is focused on ensuring that AWS CloudTrail configurations across multiple regions comply with SOC2 standards. It involves comprehensive checks on CloudTrail trail configurations, including logging status, S3 bucket integrations, and CloudWatch Logs, ensuring global event capture and multi-region setup. It's essential for maintaining SOC2 compliance, emphasizing data security and integrity in cloud environments, and helps organizations manage their compliance posture efficiently.
inputsoutputs8- 8.1jtE72IY5WRDEnevpEUHqList all AWS CloudTrail Trails
8.1
List all AWS CloudTrail Trails
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves enumerating and retrieving detailed information about every AWS CloudTrail trail that exists across all AWS regions within an AWS account. Each trail captures specific API activity and events, and having a comprehensive list helps in providing visibility into what actions are being logged, where the logs are stored, and how they are configured. This listing process is foundational for subsequent tasks like auditing, analysis, and optimization of AWS CloudTrail, aiding in efficient resource management and security compliance.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Set region to None for all regions, or specify a valid AWS region string for a specific region #target_region = None target_region = target_region if target_region else None try: # List all available AWS regions ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') all_regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] except Exception as e: print(f"ERROR: An error occurred while listing AWS regions: {e}") all_regions = [] # Get trails for all regions or a specific region regions_to_check = all_regions if target_region is None else [target_region] all_trails = [] for region in regions_to_check: try: # List all trails in AWS CloudTrail for each region cloudtrail_client = boto3.client('cloudtrail', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) response = cloudtrail_client.describe_trails(includeShadowTrails=False) trails_in_region = response['trailList'] if not trails_in_region: print(f"INFO: No trails found in region {region}.") else: all_trails.extend(trails_in_region) except Exception as e: # Handle exceptions thrown while listing trails for a region print(f"ERROR: An error occurred while listing trails in region {region}: {e}") # Print all trails if not all_trails: print("INFO: No trails found in all specified regions.") else: try: #print(all_trails) # for downstream task for trail in all_trails: print(f"Trail Name: {trail['Name']}, Trail ARN: {trail['TrailARN']}, Home Region: {trail['HomeRegion']}") except KeyError as ke: print(f"ERROR: Missing key {ke} in trail information: {trail}") except Exception as e: print(f"ERROR: An error occurred while printing trail information: {e}") print(f"SUMMARY: Processed {len(regions_to_check)} regions and found a total of {len(all_trails)} trails.")copied8.1 - 8.2Weiel7NV4hBihuinlCznCheck whether a Multi-Region AWS CloudTrail exists with the required configurations: SOC2 Guideline
8.2
Check whether a Multi-Region AWS CloudTrail exists with the required configurations: SOC2 Guideline
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task verifies the existence and configuration of a Multi-Region AWS CloudTrail in compliance with SOC2 guidelines. It focuses on ensuring essential settings like logging, S3 and CloudWatch integrations, and global event coverage. This is crucial for upholding data security and integrity standards across an organization's AWS infrastructure.
inputsoutputs# Multi-Region CloudTrail Compliance Verification: SOC2 Guideline import boto3 from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_available_regions(service_name): """List all available regions for a given AWS service.""" ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] return regions def check_trails_in_region(region_name, s3_bucket_name, sns_topic_arn, cloudwatch_log_group_arn, include_management_events, read_write_type): """Check CloudTrail trails in a specific region and return details of compliant and non-compliant trails.""" non_compliant_trails = [] compliant_trail_details = None try: cloudtrail_client = boto3.client('cloudtrail',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) trails = cloudtrail_client.describe_trails(includeShadowTrails=True)['trailList'] for trail in trails: if trail['IsMultiRegionTrail']: try: trail_config_response = cloudtrail_client.get_trail(Name=trail['TrailARN']) trail_config = trail_config_response.get('Trail', {}) trail_status_response = cloudtrail_client.get_trail_status(Name=trail['TrailARN']) is_logging = trail_status_response.get('IsLogging', False) except ClientError as e: print(f"Error in {region_name}: {e}") continue settings_match = ( is_logging and trail_config.get('S3BucketName') == s3_bucket_name and trail_config.get('SnsTopicARN') == sns_topic_arn and ('CloudWatchLogsLogGroupArn' not in trail_config or trail_config.get('CloudWatchLogsLogGroupArn') == cloudwatch_log_group_arn) and trail_config.get('IncludeGlobalServiceEvents') == include_management_events and trail_config.get('IsMultiRegionTrail', False) is True ) if settings_match: compliant_trail_details = { 'Region': region_name, 'Name': trail_config.get('Name'), 'S3BucketName': trail_config.get('S3BucketName'), 'SnsTopicARN': trail_config.get('SnsTopicARN'), 'CloudWatchLogsLogGroupArn': trail_config.get('CloudWatchLogsLogGroupArn'), 'IncludeManagementEvents': trail_config.get('IncludeGlobalServiceEvents'), 'IsMultiRegionTrail': trail_config.get('IsMultiRegionTrail') } return True, compliant_trail_details, non_compliant_trails else: non_compliant_trails.append(trail['Name']) return False, compliant_trail_details, non_compliant_trails except ClientError as e: print(f"AWS client error in region {region_name}: {e}") return False, compliant_trail_details, non_compliant_trails except Exception as e: print(f"An unexpected error occurred in region {region_name}: {e}") return False, compliant_trail_details, non_compliant_trails def check_cloudtrail_compliance(s3_bucket_name, sns_topic_arn, cloudwatch_log_group_arn, include_management_events, read_write_type): try: regions = list_available_regions('cloudtrail') compliant_in_any_region = False all_non_compliant_trails = {} compliant_trail_details = None for region in regions: compliant, details, non_compliant_trails = check_trails_in_region(region, s3_bucket_name, sns_topic_arn, cloudwatch_log_group_arn, include_management_events, read_write_type) all_non_compliant_trails[region] = non_compliant_trails if compliant: compliant_trail_details = details compliant_in_any_region = True break if compliant_in_any_region: print("Compliant Trail Found:") for key, value in compliant_trail_details.items(): print(f" {key}: {value}") else: print("Summary of Non-Compliant Trails by Region:") for region, trails in all_non_compliant_trails.items(): if trails: print(f" Region: {region}, Non-Compliant Trails: {', '.join(trails)}") else: print(f" Region: {region} has no non-compliant multi-region trails.") return compliant_in_any_region except NoCredentialsError: print("No AWS credentials found. Please configure your credentials.") return False except PartialCredentialsError: print("Incomplete AWS credentials. Please check your configuration.") return False except Exception as e: print(f"An unexpected error occurred: {e}") return False #s3_bucket_name='aws-cloudtrail-logs-355237452254-d5db7269' #sns_topic_arn='arn:aws:sns:ap-south-1:355237452254:aws-cloudtrail-logs-355237452254-0ac1f096' #cloudwatch_log_group_arn='arn:aws:logs:ap-south-1:355237452254:log-group:aws-cloudtrail-logs-355237452254-fc0d6f36:*' #include_management_events=True #read_write_type='ALL' #Type of events to record. Valid values are ReadOnly, WriteOnly and ALL. compliant = check_cloudtrail_compliance( s3_bucket_name, sns_topic_arn, cloudwatch_log_group_arn, include_management_events, read_write_type ) if compliant: print("\nAt least one compliant multi-region CloudTrail exists.") else: print("\nNo compliant multi-region CloudTrails found matching the specified criteria.") context.proceed = Falsecopied8.2