Manage Unused AWS S3 Buckets

This runbook involves identifying buckets that have not been accessed or modified within a set period, such as 90 or 180 days. Once identified, these buckets can be reviewed for important data, then archived, transferred to cost-effective storage, or deleted. This process helps reduce costs, optimize resource usage, and enhance security by minimizing potential risks associated with unmonitored storage resources.

  1. 1

    This task involves retrieving and displaying a comprehensive list of all Amazon S3 buckets within an AWS account. This step is crucial as it provides a clear overview of all the storage resources available, serving as a starting point for various management and security tasks, such as enforcing encryption or implementing access policies.

    import boto3 from botocore.exceptions import BotoCoreError, NoCredentialsError, PartialCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_s3_buckets(): try: # Creating a Boto3 S3 client s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) # Sending a request to list S3 buckets response = s3.list_buckets() buckets_info = [] # Iterate through each bucket in the response for bucket in response['Buckets']: # Get the region of the bucket location = s3.get_bucket_location(Bucket=bucket['Name']) region = location['LocationConstraint'] if region is None: region = 'us-east-1' # The default region if none is specified # Append bucket details to the list buckets_info.append({ 'Name': bucket['Name'], 'Region': region }) return buckets_info except NoCredentialsError: print("Error: AWS credentials not found") return None except PartialCredentialsError: print("Error: Incomplete AWS credentials") return None except BotoCoreError as e: print(f"Error: AWS SDK for Python (Boto3) core error occurred - {e}") return None except Exception as e: print(f"Unexpected error: {e}") return None def display_buckets_details(buckets): # Initialize table with the desired structure and headers table = context.newtable() table.title = "S3 Bucket Details" table.num_cols = 2 # Number of columns for Name and Region table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names headers = ["Bucket Name", "Region"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Populate the table with bucket data for row_num, bucket in enumerate(buckets, start=1): table.num_rows += 1 # Add a row for each bucket values = [bucket['Name'], bucket['Region']] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main block buckets_info = list_all_s3_buckets() if buckets_info is not None: if buckets_info: table = display_buckets_details(buckets_info) #print("S3 bucket details are displayed in the table.") else: print("No S3 buckets found.") else: print("Error occurred while trying to list S3 buckets.")
    copied
    1
  2. 2

    This task filters out unused AWS S3 buckets involves identifying buckets that have not been accessed or modified within a set period, such as 90 or 180 days. Once identified, these buckets can be reviewed for important data, then archived, transferred to cost-effective storage, or deleted.

    import boto3 from datetime import datetime, timedelta, timezone creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize S3 client def get_s3_client(): return boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def list_all_buckets(s3_client): try: buckets = s3_client.list_buckets() return buckets['Buckets'] except Exception as e: print(f"Failed to list S3 buckets: {e}") return [] def get_last_modified_object(s3_client, bucket_name): try: paginator = s3_client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name) last_modified = None for page in page_iterator: if 'Contents' in page: for obj in page['Contents']: if last_modified is None or obj['LastModified'] > last_modified: last_modified = obj['LastModified'] return last_modified except Exception as e: print(f"Error accessing objects in bucket {bucket_name}: {e}") return None # def filter_unused_buckets(s3_client, days_threshold): unused_buckets = [] current_time = datetime.now(timezone.utc) threshold_time = current_time - timedelta(days=days_threshold) buckets = list_all_buckets(s3_client) for bucket in buckets: last_modified = get_last_modified_object(s3_client, bucket['Name']) if last_modified is None or last_modified < threshold_time: unused_buckets.append(bucket['Name']) return unused_buckets # Main Execution #last_accessed_threshold_days = 90 s3_client = get_s3_client() unused_buckets = filter_unused_buckets(s3_client, last_accessed_threshold_days) print(unused_buckets) # for debugging if unused_buckets: print(f"Unused buckets (no modifications for at least {last_accessed_threshold_days} days):") for bucket in unused_buckets: print(bucket) else: print("No unused buckets found based on the specified threshold.")
    copied
    2
  3. 3

    This task identifies unused AWS S3 buckets by checking three main criteria: the bucket's last modification date exceeds a specified threshold, it contains no objects, and it is not configured as a website. This helps in managing storage efficiently by pinpointing potentially redundant buckets, which can reduce costs and simplify cloud infrastructure management.

    import boto3 from datetime import datetime, timedelta, timezone from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize S3 client def get_s3_client(): return boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) def list_all_buckets(s3_client): try: buckets = s3_client.list_buckets() return buckets['Buckets'] except Exception as e: print(f"Failed to list S3 buckets: {e}") return [] def get_last_modified_object(s3_client, bucket_name): try: paginator = s3_client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name) last_modified = None for page in page_iterator: if 'Contents' in page: for obj in page['Contents']: if last_modified is None or obj['LastModified'] > last_modified: last_modified = obj['LastModified'] return last_modified except Exception as e: print(f"Error accessing objects in bucket {bucket_name}: {e}") return None def is_bucket_empty(s3_client, bucket_name): response = s3_client.list_objects_v2(Bucket=bucket_name) return 'Contents' not in response def is_bucket_website(s3_client, bucket_name): try: s3_client.get_bucket_website(Bucket=bucket_name) return True except ClientError as e: if e.response['Error']['Code'] == 'NoSuchWebsiteConfiguration': return False raise # Rethrow the exception if it's not the expected "NoSuchWebsiteConfiguration" def filter_unused_buckets(s3_client, days_threshold): unused_buckets = [] current_time = datetime.now(timezone.utc) threshold_time = current_time - timedelta(days=days_threshold) buckets = list_all_buckets(s3_client) for bucket in buckets: bucket_name = bucket['Name'] if is_bucket_empty(s3_client, bucket_name) and not is_bucket_website(s3_client, bucket_name): last_modified = get_last_modified_object(s3_client, bucket_name) if last_modified is None or last_modified < threshold_time: unused_buckets.append(bucket_name) return unused_buckets # Main Execution s3_client = get_s3_client() #last_accessed_threshold_days = 90 # Define the threshold days unused_buckets = filter_unused_buckets(s3_client, last_accessed_threshold_days) #print(unused_buckets) # for debugging if unused_buckets: print(f"Unused buckets (no modifications for at least {last_accessed_threshold_days} days):") for bucket in unused_buckets: print(bucket) else: print("No unused buckets found based on the specified threshold.")
    copied
    3