Filter Out VPCs with Flow Logs not enabled in AWS
This task identifies Virtual Private Clouds (VPCs) in an AWS environment that lack active Flow Logs. This task is essential for security and compliance, ensuring that network traffic is monitored and logged. It involves checking each VPC's Flow Logs status and isolating those without the feature, helping to prioritize security enhancements and network monitoring strategies.
- 1I4Jg58AgFTnrLoNniBs9Create an AWS S3 bucket
1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves setting up a unique data storage bucket in Amazon S3 for storing, managing, and retrieving data, with options for access control, versioning, and lifecycle management. S3 buckets provide a scalable and secure cloud storage solution.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # AWS S3 client initialization s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Bucket name to create #bucket_name = 'my-logging-bucket-name' # Replace with your desired bucket name # Create S3 bucket try: s3_client.create_bucket(Bucket=bucket_name) print(f"Bucket {bucket_name} created successfully.") except ClientError as e: print(f"Error creating S3 bucket {bucket_name}: {e}")copied1 - 2jLl4PEsrWvzRveVfhAQuUpdate AWS S3 bucket policy for VPC Flow Logs
2
Update AWS S3 bucket policy for VPC Flow Logs
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputsimport boto3 import json from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_bucket_policy(bucket_name, account_number, regions): """ Create a bucket policy for the specified bucket, account number, and regions. """ policy_statements = [ { "Sid": "AWSLogDeliveryAclCheck", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:GetBucketAcl", "Resource": f"arn:aws:s3:::{bucket_name}", "Condition": {"StringEquals": {"aws:SourceAccount": account_number}} } ] resource = f"arn:aws:s3:::{bucket_name}/AWSLogs/{account_number}/*" for region in regions: source_arn = f"arn:aws:logs:{region}:{account_number}:*" policy_statements.append( { "Sid": f"AWSLogDeliveryWrite_{region}", "Effect": "Allow", "Principal": {"Service": "delivery.logs.amazonaws.com"}, "Action": "s3:PutObject", "Resource": resource, "Condition": { "StringEquals": { "aws:SourceAccount": account_number, "s3:x-amz-acl": "bucket-owner-full-control" }, "ArnLike": {"aws:SourceArn": source_arn} } } ) policy = { "Version": "2012-10-17", "Id": "AWSLogDeliveryWrite20150319", "Statement": policy_statements } return policy def update_s3_bucket_policy(s3_client, bucket_name, policy): """ Update the S3 bucket policy. """ try: s3_client.put_bucket_policy( Bucket=bucket_name, Policy=json.dumps(policy) ) print(f"Bucket policy updated for {bucket_name}.") except ClientError as e: print(f"Error updating bucket policy: {e}") account_number = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key).get_caller_identity()['Account'] #bucket_name = 'your-bucket-name' # Replace with your S3 bucket name #regions_for_bucket_policy = ['us-east-1', 'ap-south-1'] # List of regions # This part will be used if the user has the same logging bucket for multiple regions for VPC Flow Logs # Create S3 client s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) # Create and update the bucket policy policy = create_bucket_policy(bucket_name, account_number, regions_for_bucket_policy) update_s3_bucket_policy(s3_client, bucket_name, policy) s3_bucket_arn = f"arn:aws:s3:::{bucket_name}" #passed to downstream taskcopied2 - 3dTFEJj9CIWX6Q1fHHXTdConfigure VPC Flow Logs in AWS to an S3 bucket
3
Configure VPC Flow Logs in AWS to an S3 bucket
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task activates a logging feature for Virtual Private Clouds (VPCs) in AWS. This feature records and stores information about the network traffic flowing through the VPC, aiding in security monitoring, traffic analysis, and troubleshooting. The collected data can be sent to Amazon CloudWatch Logs or Amazon S3 for retention and analysis.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def enable_vpc_flow_logs(vpc_id, region, s3_bucket_arn): """ Enable VPC Flow Logs for the specified VPC, directing them to an S3 bucket. """ try: session = boto3.Session(aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) ec2 = session.client('ec2') # Create the flow log response = ec2.create_flow_logs( ResourceIds=[vpc_id], ResourceType='VPC', TrafficType='ALL', LogDestinationType='s3', LogDestination=s3_bucket_arn ) print(response) if response['Unsuccessful']: print(f"Failed to enable Flow Logs for VPC {vpc_id} in region {region}.") else: print(f"Successfully enabled Flow Logs for VPC {vpc_id} in region {region}.") except ClientError as e: print(f"An error occurred in region {region} for VPC {vpc_id}: {e}") # List of VPCs without flow logs #vpcs_without_flow_logs = [{'Region': 'ap-south-1', 'VPC_ID': 'vpc-0c433ca0ab76e67ae'}] # S3 bucket ARN for storing flow logs #s3_bucket_arn = 'arn:aws:s3:::your-bucket-name' # Replace with your S3 bucket ARN # Enabling flow logs for each VPC for vpc in vpcs_without_flow_logs: enable_vpc_flow_logs(vpc['VPC_ID'], vpc['Region'], s3_bucket_arn)copied3