Enable VPC Flow Logs in AWS
This runbook involves turning on a feature for capturing information about IP traffic going to and from networks interfaces in a Virtual Private Cloud (VPC). This data is vital for network monitoring, security analysis, and troubleshooting. The logs can be stored in Amazon CloudWatch Logs or Amazon S3 for detailed analysis and archival purposes, aiding in compliance and operational auditing.
- 1NyeDrRgo1w7ndZEQJZYaList All VPCs in AWS
1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task enumerates all Virtual Private Clouds across every AWS region in an account. This task is essential for network management, security audits, and resource tracking, especially in large-scale environments. It provides details like VPC IDs, CIDR blocks, and associated resources for each VPC.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_vpcs_in_region(region_name): vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) ec2 = session.client('ec2') response = ec2.describe_vpcs() vpcs = response.get('Vpcs', []) if vpcs: print(f"In region '{region_name}', found the following VPCs:") for vpc in vpcs: vpc_id = vpc['VpcId'] vpcs_info.append({'Region': region_name, 'VPC_ID': vpc_id}) print(f" VPC ID: {vpc_id}") else: print(f"No VPCs found in region '{region_name}'.") except ClientError as e: print(f"An error occurred in region {region_name}: {e}") except Exception as e: print(f"An unexpected error occurred in region {region_name}: {e}") return vpcs_info def list_vpcs_all_regions(): all_vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') ec2 = session.client('ec2') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] for region in regions: vpcs_info = list_vpcs_in_region(region) all_vpcs_info.extend(vpcs_info) except ClientError as e: print(f"An error occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return all_vpcs_info #region_name = None # Replace with a region name or leave as None for all regions if region_name: vpcs = list_vpcs_in_region(region_name) else: vpcs = list_vpcs_all_regions() #print(vpcs) # Summary of all VPCs across regions if vpcs: print("\nSummary of all VPCs across regions:") for vpc in vpcs: print(f"Region: {vpc['Region']}, VPC ID: {vpc['VPC_ID']}") else: print("No VPCs found in any of the regions.")copied1 - 2C7h7EgxU8gkBcLlRkY9oFilter Out VPCs with Flow Logs not enabled in AWS
2
Filter Out VPCs with Flow Logs not enabled in AWS
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task identifies Virtual Private Clouds (VPCs) in an AWS environment that lack active Flow Logs. This task is essential for security and compliance, ensuring that network traffic is monitored and logged. It involves checking each VPC's Flow Logs status and isolating those without the feature, helping to prioritize security enhancements and network monitoring strategies.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_vpc_flow_logs(vpc_id, region): try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Check for flow logs response = ec2.describe_flow_logs(Filters=[{'Name': 'resource-id', 'Values': [vpc_id]}]) if response['FlowLogs']: print(f"Flow Logs are enabled for VPC {vpc_id} in region {region}.") return True else: print(f"Flow Logs are not enabled for VPC {vpc_id} in region {region}.") return False except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") return False # Example VPCs list #vpcs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}, {'Region': 'eu-north-1', 'VPC_ID': 'vpc-0db5fbfe0a4263ef5'}] # Checking flow logs for each VPC and collecting VPCs without flow logs vpcs_without_flow_logs = [] for vpc in vpcs: if not check_vpc_flow_logs(vpc['VPC_ID'], vpc['Region']): vpcs_without_flow_logs.append(vpc) #print(vpcs_without_flow_logs) #for debugging context.skip_sub_tasks=True # Example vpcs_without_flow_logs passed to the downstream task #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}, {'Region': 'eu-north-1', 'VPC_ID': 'vpc-0db5fbfe0a4263ef5'}]copied2- 2.1I4Jg58AgFTnrLoNniBs9Create an AWS S3 bucket
2.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves setting up a unique data storage bucket in Amazon S3 for storing, managing, and retrieving data, with options for access control, versioning, and lifecycle management. S3 buckets provide a scalable and secure cloud storage solution.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS S3 client initialization s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Bucket name to create #bucket_name = 'my-logging-bucket-name' # Replace with your desired bucket name # Create S3 bucket try: s3_client.create_bucket(Bucket=bucket_name) print(f"Bucket {bucket_name} created successfully.") except ClientError as e: print(f"Error creating S3 bucket {bucket_name}: {e}")copied2.1 - 2.2jLl4PEsrWvzRveVfhAQuUpdate AWS S3 bucket policy for VPC Flow Logs
2.2
Update AWS S3 bucket policy for VPC Flow Logs
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputsimport boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_bucket_policy(bucket_name, account_number, regions): """ Create a bucket policy for the specified bucket, account number, and regions. """ policy_statements = [ { "Sid": "AWSLogDeliveryAclCheck", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:GetBucketAcl", "Resource": f"arn:aws:s3:::{bucket_name}", "Condition": {"StringEquals": {"aws:SourceAccount": account_number}} } ] resource = f"arn:aws:s3:::{bucket_name}/AWSLogs/{account_number}/*" for region in regions: source_arn = f"arn:aws:logs:{region}:{account_number}:*" policy_statements.append( { "Sid": f"AWSLogDeliveryWrite_{region}", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:PutObject", "Resource": resource, "Condition": { "StringEquals": { "aws:SourceAccount": account_number, "s3:x-amz-acl": "bucket-owner-full-control" }, "ArnLike": {"aws:SourceArn": source_arn} } } ) policy = { "Version": "2012-10-17", "Id": "AWSLogDeliveryWrite20150319", "Statement": policy_statements } return policy def update_s3_bucket_policy(s3_client, bucket_name, policy): """ Update the S3 bucket policy. """ try: s3_client.put_bucket_policy( Bucket=bucket_name, Policy=json.dumps(policy) ) print(f"Bucket policy updated for {bucket_name}.") except ClientError as e: print(f"Error updating bucket policy: {e}") account_number = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key).get_caller_identity()['Account'] #bucket_name = 'your-bucket-name' # Replace with your S3 bucket name #regions_for_bucket_policy = ['us-east-1', 'ap-south-1'] # List of regions # This part will be used if the user has the same logging bucket for multiple regions for VPC Flow Logs # Create S3 client s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Create and update the bucket policy policy = create_bucket_policy(bucket_name, account_number, regions_for_bucket_policy) update_s3_bucket_policy(s3_client, bucket_name, policy) s3_bucket_arn = f"arn:aws:s3:::{bucket_name}" #passed to downstream taskcopied2.2 - 2.3dTFEJj9CIWX6Q1fHHXTdConfigure VPC Flow Logs in AWS to an S3 bucket
2.3
Configure VPC Flow Logs in AWS to an S3 bucket
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task activates a logging feature for Virtual Private Clouds (VPCs) in AWS. This feature records and stores information about the network traffic flowing through the VPC, aiding in security monitoring, traffic analysis, and troubleshooting. The collected data can be sent to Amazon CloudWatch Logs or Amazon S3 for retention and analysis.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def enable_vpc_flow_logs(vpc_id, region, s3_bucket_arn): """ Enable VPC Flow Logs for the specified VPC, directing them to an S3 bucket. """ try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Create the flow log response = ec2.create_flow_logs( ResourceIds=[vpc_id], ResourceType='VPC', TrafficType='ALL', LogDestinationType='s3', LogDestination=s3_bucket_arn ) print(response) if response['Unsuccessful']: print(f"Failed to enable Flow Logs for VPC {vpc_id} in region {region}.") else: print(f"Successfully enabled Flow Logs for VPC {vpc_id} in region {region}.") except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") # List of VPCs without flow logs #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}] # S3 bucket ARN for storing flow logs #s3_bucket_arn = 'arn:aws:s3:::your-bucket-name' # Replace with your S3 bucket ARN # Enabling flow logs for each VPC for vpc in vpcs_without_flow_logs: enable_vpc_flow_logs(vpc['VPC_ID'], vpc['Region'], s3_bucket_arn)copied2.3