Enable VPC Flow Logs in AWS

This runbook involves turning on a feature for capturing information about IP traffic going to and from networks interfaces in a Virtual Private Cloud (VPC). This data is vital for network monitoring, security analysis, and troubleshooting. The logs can be stored in Amazon CloudWatch Logs or Amazon S3 for detailed analysis and archival purposes, aiding in compliance and operational auditing.

region_name=None #Hardcoded for single execution result, Use None when you want to run the script for all regions.
copied
  1. 1

    This task enumerates all Virtual Private Clouds across every AWS region in an account. This task is essential for network management, security audits, and resource tracking, especially in large-scale environments. It provides details like VPC IDs, CIDR blocks, and associated resources for each VPC.

    import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_vpcs_in_region(region_name): vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) ec2 = session.client('ec2') response = ec2.describe_vpcs() vpcs = response.get('Vpcs', []) if vpcs: print(f"In region '{region_name}', found the following VPCs:") for vpc in vpcs: vpc_id = vpc['VpcId'] vpcs_info.append({'Region': region_name, 'VPC_ID': vpc_id}) print(f" VPC ID: {vpc_id}") else: print(f"No VPCs found in region '{region_name}'.") except ClientError as e: print(f"An error occurred in region {region_name}: {e}") except Exception as e: print(f"An unexpected error occurred in region {region_name}: {e}") return vpcs_info def list_vpcs_all_regions(): all_vpcs_info = [] try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') ec2 = session.client('ec2') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] for region in regions: vpcs_info = list_vpcs_in_region(region) all_vpcs_info.extend(vpcs_info) except ClientError as e: print(f"An error occurred: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return all_vpcs_info #region_name = None # Replace with a region name or leave as None for all regions if region_name: vpcs = list_vpcs_in_region(region_name) else: vpcs = list_vpcs_all_regions() #print(vpcs) # Summary of all VPCs across regions if vpcs: print("\nSummary of all VPCs across regions:") for vpc in vpcs: print(f"Region: {vpc['Region']}, VPC ID: {vpc['VPC_ID']}") else: print("No VPCs found in any of the regions.")
    copied
    1
  2. 2

    This task identifies Virtual Private Clouds (VPCs) in an AWS environment that lack active Flow Logs. This task is essential for security and compliance, ensuring that network traffic is monitored and logged. It involves checking each VPC's Flow Logs status and isolating those without the feature, helping to prioritize security enhancements and network monitoring strategies.

    import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_vpc_flow_logs(vpc_id, region): try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Check for flow logs response = ec2.describe_flow_logs(Filters=[{'Name': 'resource-id', 'Values': [vpc_id]}]) if response['FlowLogs']: print(f"Flow Logs are enabled for VPC {vpc_id} in region {region}.") return True else: print(f"Flow Logs are not enabled for VPC {vpc_id} in region {region}.") return False except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") return False # Example VPCs list #vpcs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}, {'Region': 'eu-north-1', 'VPC_ID': 'vpc-0db5fbfe0a4263ef5'}] # Checking flow logs for each VPC and collecting VPCs without flow logs vpcs_without_flow_logs = [] for vpc in vpcs: if not check_vpc_flow_logs(vpc['VPC_ID'], vpc['Region']): vpcs_without_flow_logs.append(vpc) #print(vpcs_without_flow_logs) #for debugging context.skip_sub_tasks=True # Example vpcs_without_flow_logs passed to the downstream task #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}, {'Region': 'eu-north-1', 'VPC_ID': 'vpc-0db5fbfe0a4263ef5'}]
    copied
    2
    1. 2.1

      This task involves setting up a unique data storage bucket in Amazon S3 for storing, managing, and retrieving data, with options for access control, versioning, and lifecycle management. S3 buckets provide a scalable and secure cloud storage solution.

      import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS S3 client initialization s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Bucket name to create #bucket_name = 'my-logging-bucket-name' # Replace with your desired bucket name # Create S3 bucket try: s3_client.create_bucket(Bucket=bucket_name) print(f"Bucket {bucket_name} created successfully.") except ClientError as e: print(f"Error creating S3 bucket {bucket_name}: {e}")
      copied
      2.1
    2. 2.2

      import boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_bucket_policy(bucket_name, account_number, regions): """ Create a bucket policy for the specified bucket, account number, and regions. """ policy_statements = [ { "Sid": "AWSLogDeliveryAclCheck", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:GetBucketAcl", "Resource": f"arn:aws:s3:::{bucket_name}", "Condition": {"StringEquals": {"aws:SourceAccount": account_number}} } ] resource = f"arn:aws:s3:::{bucket_name}/AWSLogs/{account_number}/*" for region in regions: source_arn = f"arn:aws:logs:{region}:{account_number}:*" policy_statements.append( { "Sid": f"AWSLogDeliveryWrite_{region}", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:PutObject", "Resource": resource, "Condition": { "StringEquals": { "aws:SourceAccount": account_number, "s3:x-amz-acl": "bucket-owner-full-control" }, "ArnLike": {"aws:SourceArn": source_arn} } } ) policy = { "Version": "2012-10-17", "Id": "AWSLogDeliveryWrite20150319", "Statement": policy_statements } return policy def update_s3_bucket_policy(s3_client, bucket_name, policy): """ Update the S3 bucket policy. """ try: s3_client.put_bucket_policy( Bucket=bucket_name, Policy=json.dumps(policy) ) print(f"Bucket policy updated for {bucket_name}.") except ClientError as e: print(f"Error updating bucket policy: {e}") account_number = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key).get_caller_identity()['Account'] #bucket_name = 'your-bucket-name' # Replace with your S3 bucket name #regions_for_bucket_policy = ['us-east-1', 'ap-south-1'] # List of regions # This part will be used if the user has the same logging bucket for multiple regions for VPC Flow Logs # Create S3 client s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Create and update the bucket policy policy = create_bucket_policy(bucket_name, account_number, regions_for_bucket_policy) update_s3_bucket_policy(s3_client, bucket_name, policy) s3_bucket_arn = f"arn:aws:s3:::{bucket_name}" #passed to downstream task
      copied
      2.2
    3. 2.3

      This task activates a logging feature for Virtual Private Clouds (VPCs) in AWS. This feature records and stores information about the network traffic flowing through the VPC, aiding in security monitoring, traffic analysis, and troubleshooting. The collected data can be sent to Amazon CloudWatch Logs or Amazon S3 for retention and analysis.

      import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def enable_vpc_flow_logs(vpc_id, region, s3_bucket_arn): """ Enable VPC Flow Logs for the specified VPC, directing them to an S3 bucket. """ try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Create the flow log response = ec2.create_flow_logs( ResourceIds=[vpc_id], ResourceType='VPC', TrafficType='ALL', LogDestinationType='s3', LogDestination=s3_bucket_arn ) print(response) if response['Unsuccessful']: print(f"Failed to enable Flow Logs for VPC {vpc_id} in region {region}.") else: print(f"Successfully enabled Flow Logs for VPC {vpc_id} in region {region}.") except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") # List of VPCs without flow logs #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}] # S3 bucket ARN for storing flow logs #s3_bucket_arn = 'arn:aws:s3:::your-bucket-name' # Replace with your S3 bucket ARN # Enabling flow logs for each VPC for vpc in vpcs_without_flow_logs: enable_vpc_flow_logs(vpc['VPC_ID'], vpc['Region'], s3_bucket_arn)
      copied
      2.3