End-to-End AWS Cost Tracking and Management

This runbook focuses on the end-to-end management of AWS costs, starting from the initial setup of CUR reports in S3 buckets to detailed tracking and analysis of these reports for effective cost control and insights.

region_name = "us-east-1"
copied
  1. 1

    This task checks for the existence of a specified S3 bucket and exports the names of CUR reports and base paths. If these do not exist, it automates the configuration of CUR reports, including S3 bucket creation, policy updates for cost data delivery, and CUR setup.

    import boto3 from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the STS client sts_client = boto3.client('sts', aws_access_key_id=access_key, aws_secret_access_key=secret_key) create_bucket_action = True report_name = 'My-CUR-report' BASE_PATH = f"{report_name}/{report_name}/" # BASE_PATH generation in the format AWS saves these reports in the S3 buckets try: # Get the caller identity from STS caller_identity = sts_client.get_caller_identity() account_number = caller_identity.get('Account') # Initialize the S3 client with the account number s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) BUCKET_NAME = f"dagknows-cur-logging-bucket-{account_number}" try: # Attempt to get information about the specified bucket to check if it exists if (s3.head_bucket(Bucket=BUCKET_NAME)): print(f"Bucket {BUCKET_NAME} exists.") context.skip_sub_tasks = True except ClientError as e: error_code = e.response['Error']['Code'] if error_code == '404' and create_bucket_action: # Bucket doesn't exist and create_bucket_action is True print(f"Bucket {BUCKET_NAME} does not exist. Creating and configuring the bucket in the next task...") report_name = 'My-CUR-report' BASE_PATH = f"{report_name}/{report_name}/" # BASE_PATH generation in the format AWS saves these reports in the S3 buckets elif error_code == '403': print(f"Access to bucket {BUCKET_NAME} is forbidden.") context.skip_sub_tasks = True context.proceed = False else: print(f"Error when checking bucket: {e}") context.skip_sub_tasks = True context.proceed = False except NoCredentialsError: print("No AWS credentials found. Please configure your AWS credentials.") context.skip_sub_tasks = True context.proceed = False except EndpointConnectionError: print("Unable to connect to the S3 endpoint. Check your internet connection or AWS configuration.") context.skip_sub_tasks = True context.proceed = False except Exception as ex: print(f"An unexpected error occurred: {ex}") context.skip_sub_tasks = True context.proceed = False # Note: The bucket name should comply with AWS bucket naming rules and be globally unique.
    copied
    1
    1. 1.1

      This runbook provides a comprehensive guide for setting up and configuring AWS Cost and Usage Reports (CUR) to be delivered to an S3 bucket. It covers the process from creating a new S3 bucket, updating its policy for CUR compatibility, to configuring the CUR settings to target the created bucket.

      1.1
      1. 1.1.1

        This task involves creating a new Amazon S3 bucket in a specified AWS region. It's the initial step in setting up a destination for storing Cost and Usage Reports.

        import boto3 from botocore.exceptions import ClientError # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_s3_bucket(bucket_name, region): """ Creates an S3 bucket in a specified region. :param bucket_name: Name of the S3 bucket to create. :param region: Region to create the bucket in. """ s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: if region == 'us-east-1': #Your default region should be specified here s3_client.create_bucket(Bucket=bucket_name) else: s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region}) print(f"S3 bucket '{bucket_name}' created in {region}.") except ClientError as e: print(f"Error creating S3 bucket: {e}") # Example usage #bucket_name = 'test-this-cur-logging-bucket-1234' # Replace with your desired bucket name #region_name = 'us-east-1' # Replace with your desired region, e.g., 'us-east-1' #print(f"bucket received from upstream task {BUCKET_NAME}") #print(f"region name received from upstream task {region_name}") create_s3_bucket(BUCKET_NAME, region_name)
        copied
        1.1.1
      2. 1.1.2

        In this task, the S3 bucket's policy is updated to grant necessary permissions for AWS Cost and Usage Reports to deliver log files to the bucket, ensuring secure and compliant data storage.

        import boto3 import json from botocore.exceptions import ClientError # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize STS client and get account ID sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key) account_id = sts_client.get_caller_identity()["Account"] def update_s3_bucket_policy_for_cur(bucket_name, account_id, region): """ Updates the S3 bucket policy to allow AWS CUR to deliver log files. :param bucket_name: Name of the S3 bucket. :param account_id: AWS account ID. :param region: AWS region. """ policy = { "Version": "2008-10-17", "Statement": [ { "Sid": "Stmt1335892150622", "Effect": "Allow", "Principal": { "Service": "billingreports.amazonaws.com" }, "Action": [ "s3:GetBucketAcl", "s3:GetBucketPolicy" ], "Resource": f"arn:aws:s3:::{bucket_name}", "Condition": { "StringEquals": { "aws:SourceAccount": account_id, "aws:SourceArn": f"arn:aws:cur:us-east-1:{account_id}:definition/*" # These enpoints here only work on us-east-1 even if the region_name is different } } }, { "Sid": "Stmt1335892526596", "Effect": "Allow", "Principal": { "Service": "billingreports.amazonaws.com" }, "Action": "s3:PutObject", "Resource": f"arn:aws:s3:::{bucket_name}/*", "Condition": { "StringEquals": { "aws:SourceAccount": account_id, "aws:SourceArn": f"arn:aws:cur:us-east-1:{account_id}:definition/*" # # These enpoints here only work on us-east-1 even if the region_name is different } } } ] } s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: s3_client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy)) print(f"Bucket policy updated to allow CUR deliveries for '{bucket_name}'.") except ClientError as e: print(f"Error updating bucket policy: {e}") # Example usage #bucket_name = 'test-this-cur-logging-bucket-1234' # Replace with the name of your existing bucket #region_name = 'us-east-1' # Replace with your region, e.g., 'us-east-1' update_s3_bucket_policy_for_cur(BUCKET_NAME, account_id, region_name)
        copied
        1.1.2
      3. 1.1.3

        This task involves configuring AWS Cost and Usage Reports (CUR) to direct the reports to the newly created and configured S3 bucket, finalizing the setup for report generation and storage.

        import boto3 from botocore.exceptions import ClientError # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def configure_cur_report(bucket_name, report_name): """ Configures AWS Cost and Usage Report to be delivered to an S3 bucket. :param bucket_name: Name of the S3 bucket for report delivery. :param report_name: Name of the report. """ cur_client = boto3.client('cur',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') # region_name is hardcoded because cur client default endpoint is only configured for us-east-1 region by aws report_definition = { 'ReportName': report_name, 'TimeUnit': 'HOURLY', 'Format': 'textORcsv', 'Compression': 'GZIP', 'S3Bucket': bucket_name, 'S3Prefix': report_name, 'S3Region': region_name, 'AdditionalSchemaElements': ['RESOURCES'], 'ReportVersioning': 'CREATE_NEW_REPORT' } try: cur_client.put_report_definition(ReportDefinition=report_definition) print(f"CUR report '{report_name}' configured for delivery to '{bucket_name}'") except ClientError as e: print(f"Error configuring CUR report: {e}") #bucket_name = 'test-this-cur-logging-bucket-123' # Replace with your S3 bucket name #report_name = 'my_test_cost_report_3' # Replace with your report name configure_cur_report(BUCKET_NAME, report_name)
        copied
        1.1.3
  2. 2

    The AWS Cost and Usage Report (CUR) provides a comprehensive view of AWS expenses and resource utilization over a specified period. By analyzing the CUR, organizations can gain deep insights into their cloud spending patterns, identify cost drivers, and optimize their AWS resource utilization. This report allows for granular cost tracking by various dimensions, including services, regions and others enabling effective budgeting and forecasting.

    2
    1. 2.1

      This task processes the AWS Cost and Usage Report (CUR) from an S3 bucket. Using the boto3 SDK, it retrieves a gzipped CSV, decodes it, and converts it into a Pandas DataFrame for analysis.

      import boto3 import gzip import pandas as pd from io import StringIO from datetime import datetime, timedelta from botocore.exceptions import ParamValidationError #BUCKET_NAME = 'dagknowscostreport' #BASE_PATH = 'costreport/dagknowscostreport/' # Is the path from bucket root directory to the 'dated folders' which contain the CUR Reports #FILENAME = 'dagknowscostreport-00001.csv.gz' #last_n_days = 100 # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def list_s3_keys(bucket, prefix): s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) keys = [] kwargs = {'Bucket': bucket, 'Prefix': prefix} while True: response = s3.list_objects_v2(**kwargs) for obj in response.get('Contents', []): keys.append(obj['Key']) try: kwargs['ContinuationToken'] = response['NextContinuationToken'] except KeyError: break return keys def fetch_data_from_s3(file_key): try: s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) response = s3.get_object(Bucket=BUCKET_NAME, Key=file_key) gz_content = response['Body'].read() csv_content = gzip.decompress(gz_content).decode('utf-8') return pd.read_csv(StringIO(csv_content), low_memory=False) except Exception as e: print(f"Error fetching data from S3 for key {file_key}: {e}") return None # Function to get the end of the previous month def get_end_of_last_month(date): return (date.replace(day=1) - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) def check_column_existence(df, column_name): if column_name not in df.columns: print(f"Warning: Column '{column_name}' not found in the DataFrame!") return False print(f"Column '{column_name}' exists in the DataFrame.") return True def list_folders(prefix): paginator = s3_client.get_paginator('list_objects_v2') folders = [] for page in paginator.paginate(Bucket=BUCKET_NAME, Prefix=prefix, Delimiter='/'): folders.extend([content['Prefix'] for content in page.get('CommonPrefixes', [])]) return folders def list_csv_gz_files(folder): response = s3_client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=folder) files = [{'key': obj['Key'], 'last_modified': obj['LastModified']} for obj in response.get('Contents', []) if obj['Key'].endswith('.csv.gz')] return files ''' def print_last_file_info(files): if not files: print("No CSV GZ files found in the folder.") return # Sort files by last modified time last_file = sorted(files, key=lambda x: x['last_modified'], reverse=True)[0] print(f"Last file: {last_file['key']}, Last modified: {last_file['last_modified']}") ''' def process_data(bucket_name, base_path, last_n_days): try: # Setup for fetching data end_date = datetime.utcnow() - timedelta(days=1) start_date = end_date - timedelta(days=last_n_days) # Modified to use the list_folders function for getting month range folders month_ranges = list_folders(base_path) all_keys = [] for month_range in month_ranges: # Logic to fetch the last .csv.gz file from the last folder of the month keys_for_month = list_csv_gz_files(month_range) if keys_for_month: # Sort files by last modified time and get the last file last_file = sorted(keys_for_month, key=lambda x: x['last_modified'], reverse=True)[0] all_keys.append(last_file['key']) print(f"Last file for {month_range}: {last_file['key']}, Last modified: {last_file['last_modified']}") else: print(f"No CSV GZ files found in the folder {month_range}.") # Fetch and process data from identified keys dfs = [fetch_data_from_s3(key) for key in all_keys] df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() # Further processing on the DataFrame df['lineItem/UsageStartDate'] = pd.to_datetime(df['lineItem/UsageStartDate']) df['day'] = df['lineItem/UsageStartDate'].dt.date # Exclude the latest date from the DataFrame latest_date = df['day'].max() df = df[df['day'] < latest_date] # Check required columns required_columns = [ 'lineItem/ProductCode', 'lineItem/UnblendedCost', 'lineItem/BlendedCost', 'lineItem/UsageStartDate', 'lineItem/UsageAccountId', 'lineItem/NormalizedUsageAmount', 'product/productFamily', 'product/instanceType' ] # Check if all required columns exist if not all([check_column_existence(df, col) for col in required_columns]): print("One or more required columns are missing.") return pd.DataFrame(), False # Returning an empty DataFrame and False return df, len(dfs) > 0 except Exception as e: print(f"ERROR: {e}") return pd.DataFrame(), False folders = list_folders(BASE_PATH) for folder in folders: print(f"Processing folder: {folder}") files = list_csv_gz_files(folder) #print_last_file_info(files) df, data_fetched = process_data(BUCKET_NAME, BASE_PATH, last_n_days) if data_fetched: print("Proceeding with further operations") else: print("No data fetched. Exiting operation.") context.proceed = False print("Script Execution End")
      copied
      2.1
    2. 2.2

      This task provides a comprehensive analysis of AWS expenses through five key modules. It categorizes costs by AWS services, identifies primary cost drivers and more. The analysis charts expenses over time and breaks down costs by AWS account IDs to pinpoint major spenders aiding in efficient financial forecasting and planning.

      2.2
      1. 2.2.1

        This task conducts a temporal analysis, showcasing daily AWS expenses over a set duration. It offers a day-by-day breakdown of AWS spending trends. The accompanying line graph vividly displays these fluctuations, where peaks signify higher costs and troughs indicate savings. This visual aid empowers users to pinpoint spending patterns, anomalies, or unexpected cost surges, thus facilitating enhanced budgetary foresight and efficient cost control.

        import pandas as pd from datetime import datetime, timedelta # Define last_n_days parameter #last_n_days = 30 # You can change this value as needed if df is not None: print("Analyzing and visualizing daily AWS costs...") # Filter out negative values filter_condition = (df['lineItem/UnblendedCost'] >= 0) #| (df['lineItem/UnblendedCost'] <= -0.001) df = df[filter_condition] # Convert 'lineItem/UsageStartDate' to datetime df['lineItem/UsageStartDate'] = pd.to_datetime(df['lineItem/UsageStartDate']) # Extract the day of the month and create a new column 'day' df['day'] = df['lineItem/UsageStartDate'].dt.date # Get the maximum date in the dataset max_date = df['day'].max() # Calculate the start date based on the max date and last_n_days start_date = max_date - timedelta(days=last_n_days) # Filter the DataFrame to only include dates greater than or equal to the start date filtered_df = df[df['day'] >= start_date] # Group by day and sum 'lineItem/UnblendedCost' for each group daily_costs = filtered_df.groupby('day')['lineItem/UnblendedCost'].sum() # Extract x and y values x = daily_costs.index.tolist() # This gets the dates y = daily_costs.values.tolist() # This gets the corresponding costs #print(df.head()) # Print the first few rows of the original DataFrame #print(daily_costs) # Print the daily costs after grouping and summing #print(y) # Print the y values extracted from daily_costs # Set the properties for your plot context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily AWS Costs (Last {last_n_days} Days)' context.plot.add_trace(name=f'Daily AWS Costs (Last {last_n_days} Days)', xpts=x, ypts=y, tracetype="lines") else: print("Failed to fetch data. Exiting.")
        copied
        2.2.1
      2. 2.2.2

        This task efficiently categorizes AWS expenses, providing a visual breakdown through a bar chart. By analyzing unblended costs, it highlights the most expensive AWS services.

        import boto3 import pandas as pd from datetime import datetime, timedelta # Define last_n_days parameter #last_n_days = 30 # You can change this value as needed if df is not None: # Filter out negative values filter_condition = (df['lineItem/UnblendedCost'] >= 0) #| (df['lineItem/UnblendedCost'] <= -0.001) df = df[filter_condition] # Convert the 'lineItem/UsageStartDate' to a datetime object df['lineItem/UsageStartDate'] = pd.to_datetime(df['lineItem/UsageStartDate']) # Extract the day of the month and create a new column 'day' df['day'] = df['lineItem/UsageStartDate'].dt.date # Get the latest date in the dataset latest_date = df['day'].max() # Calculate the start date based on the latest date and last_n_days start_date = latest_date - timedelta(days=int(last_n_days)) # Filter data to include only dates greater than or equal to the start date filtered_df = df[df['day'] >= start_date] # Group by 'lineItem/ProductCode' and 'day', then sum 'lineItem/UnblendedCost' for each group result = filtered_df.groupby(['lineItem/ProductCode', 'day'])['lineItem/UnblendedCost'].sum().reset_index() # Get the top_n_services based on total cost top_services = result.groupby('lineItem/ProductCode')['lineItem/UnblendedCost'].sum().nlargest(int(top_n_services)).index.tolist() # Filter the result DataFrame to include only the top_n_services top_result = result[result['lineItem/ProductCode'].isin(top_services)] # Set the properties for your plot context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily AWS Costs by Top {top_n_services} Services (Last {last_n_days} Days)' # Iterate over each top service in the filtered result DataFrame for service in top_result['lineItem/ProductCode'].unique(): service_data = top_result[top_result['lineItem/ProductCode'] == service] service_data = service_data.sort_values(by='day') x = service_data['day'].tolist() y = service_data['lineItem/UnblendedCost'].tolist() context.plot.add_trace(name=service, xpts=x, ypts=y, tracetype='lines') else: print("Failed to fetch data from dataframe. Exiting.")
        copied
        2.2.2
      3. 2.2.3

        2.2.3
        1. 2.2.3.1

          This task visualizes daily costs associated with various Amazon EC2 instance types over a chosen period, highlighting their unique cost structures. Given the diverse configurations of each EC2 instance type, understanding their individual cost dynamics is essential. The chart provides a daily cost breakdown for each type, helping users discern the most cost-intensive instances and their peak periods. This detailed view aids in optimizing cloud expenses, guiding users in selecting the most cost-effective instance types, reshaping usage habits, and allocating resources wisely.

          import pandas as pd from datetime import datetime, timedelta, timezone if df is not None: # Convert 'lineItem/UsageStartDate' to datetime df['lineItem/UsageStartDate'] = pd.to_datetime(df['lineItem/UsageStartDate']) # Filter out negative 'lineItem/UnblendedCost' df = df[df['lineItem/UnblendedCost'] >= 0] # Filter rows for the last N days cutoff_date = datetime.utcnow().replace(tzinfo=timezone.utc) - timedelta(days=int(last_n_days)) last_n_days_df = df[df['lineItem/UsageStartDate'] > cutoff_date].copy() # Further filter data based on product family and pricing unit condition = (last_n_days_df['product/productFamily'] == 'Compute Instance') & (last_n_days_df['pricing/unit'].isin(['Hours', 'Hrs'])) filtered_df = last_n_days_df[condition].copy() # Group by 'lineItem/UsageStartDate' and 'product/instanceType', then sum 'lineItem/UsageAmount' and 'lineItem/UnblendedCost' result = filtered_df.groupby([filtered_df['lineItem/UsageStartDate'].dt.date, 'product/instanceType']).agg( usage_hours=('lineItem/UsageAmount', 'sum'), usage_cost=('lineItem/UnblendedCost', 'sum') ).reset_index() # Set the properties for your plot context.plot.xlabel = 'Date' context.plot.ylabel = 'EC2 Usage Cost($)' context.plot.title = f'EC2 Cost Usage (Last {last_n_days} Days)' # Loop through each unique instance type and add a trace for each for instance_type in result['product/instanceType'].unique(): instance_data = result[result['product/instanceType'] == instance_type] x = instance_data['lineItem/UsageStartDate'].tolist() # Date on x-axis y = instance_data['usage_cost'].tolist() # Usage cost on y-axis context.plot.add_trace(name=f"EC2- {instance_type}", xpts=x, ypts=y, tracetype="line") print("Analysis complete.") else: print("Failed to fetch data. Exiting.")
          copied
          2.2.3.1
        2. 2.2.3.2

          This runbook involves collecting data on EC2 instances, retrieving CPU utilization metrics from Amazon CloudWatch, and visually plotting this data to identify underutilized or overutilized instances. This task helps in recognizing potential cost-saving opportunities by rightsizing instances, either by downsizing underutilized instances to reduce costs or upsizing overutilized instances to improve performance.

          region_name = None region_name_to_search_recommendations = None
          copied
          2.2.3.2
          1. 2.2.3.2.1

            Amazon Elastic Compute Cloud (EC2) is a service offered by Amazon Web Services (AWS) that provides resizable compute capacity in the cloud. Through Boto3's EC2 client, the describe_instances() method provides detailed information about each instance, including its ID, type, launch time, and current state. This capability assists users in effectively monitoring and managing their cloud resources.

            import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_regions(): ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name = 'us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def list_ec2_instances(region=None): # If no region is provided, fetch instances from all regions regions = [region] if region else list_all_regions() # Create an empty list to store instance details instance_details = [] for region in regions: # Try initializing the Boto3 EC2 client for the specific region try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except (NoCredentialsError, PartialCredentialsError): print(f"Failed for {region}: No AWS credentials found or incomplete credentials provided.") continue except BotoCoreError as e: print(f"Failed for {region}: Error initializing the EC2 client due to BotoCore Error: {e}") continue except Exception as e: print(f"Failed for {region}: Unexpected error initializing the EC2 client: {e}") continue #print(f"Fetching EC2 instance details for region: {region}...") # Try to paginate through the EC2 instance responses for the specific region try: paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page['Reservations']: for instance in reservation['Instances']: # Extract the desired attributes instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'] state = instance['State']['Name'] # Append the details to the list instance_details.append({ 'InstanceId': instance_id, 'InstanceType': instance_type, 'LaunchTime': launch_time, 'State': state, 'Region': region }) #print(f"Fetched all instance details for region: {region} successfully!") except ClientError as e: print(f"Failed for {region}: AWS Client Error while fetching EC2 instance details: {e}") except Exception as e: print(f"Failed for {region}: Unexpected error while fetching EC2 instance details: {e}") return instance_details # You can replace None with a specific region string like 'us-east-1' to get instances from a specific region instances_list = list_ec2_instances(region_name) if instances_list: print("\nEC2 Instance Details:") for instance in instances_list: print("-" * 50) # Separator line for key, value in instance.items(): print(f"{key}: {value}") else: print("No instances found or an error occurred.")
            copied
            2.2.3.2.1
          2. 2.2.3.2.2

            This task aggregates CPU utilization data for running EC2 instances across an AWS account, computes the average CPU usage over a specified period, and plots the average to help in assessing overall resource efficiency.

            import boto3 from datetime import datetime, timedelta last_n_days=30 # AWS Credentials creds = _get_creds(cred_label)['creds'] # Placeholder function to get AWS credentials access_key = creds['username'] secret_key = creds['password'] '''# Placeholder for instances_list instances_list = [ {'InstanceId': 'instance1', 'Region': 'us-east-1', 'State': 'running'}, {'InstanceId': 'instance2', 'Region': 'us-east-1', 'State': 'running'}, # Add more instances as needed ] ''' def fetch_cpu_utilization(instance_id, region, start_time, end_time): cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) metrics = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Average'] ) # Calculate average CPU utilization without NumPy data_points = metrics.get('Datapoints', []) if data_points: avg_cpu = sum(dp['Average'] for dp in data_points) / len(data_points) else: avg_cpu = 0 return avg_cpu def plot_cpu_utilization(instances_list, last_n_days=7): start_time = datetime.utcnow() - timedelta(days=last_n_days) end_time = datetime.utcnow() avg_utilizations = [] for instance in instances_list: if instance['State'] == 'running': avg_cpu = fetch_cpu_utilization(instance['InstanceId'], instance['Region'], start_time, end_time) avg_utilizations.append((instance['InstanceId'], avg_cpu)) # Sort instances by average CPU utilization and select top 3 and bottom 3 avg_utilizations.sort(key=lambda x: x[1], reverse=True) top_instances = avg_utilizations[:3] bottom_instances = avg_utilizations[-3:] # Prepare data for plotting instance_ids = [x[0] for x in top_instances + bottom_instances] utilizations = [x[1] for x in top_instances + bottom_instances] # Plotting context.plot.add_trace( name="CPU Utilization", xpts=instance_ids, ypts=utilizations, tracetype='bar' ) context.plot.xlabel = 'Instance ID' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'Top & Bottom 3 EC2 Instances by CPU Utilization (Last {last_n_days} Days)' plot_cpu_utilization(instances_list, last_n_days=30)
            copied
            2.2.3.2.2
          3. 2.2.3.2.3

            This task entails collecting CPU usage metrics from Amazon CloudWatch, calculating the average utilization, and visualizing this data. This task aids in identifying underutilized or overutilized instances, facilitating efficient resource management and cost optimization in AWS.

            import boto3 from datetime import datetime, timedelta from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError, EndpointConnectionError, DataNotFoundError last_n_days=30 # AWS credentials creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('instances_list') is None: instances_list = [] # Function to fetch CPU utilization for a given instance def fetch_cpu_utilization(instance_id, region, start_time, end_time): try: cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) response = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}] }, 'Period': 3600, # one hour 'Stat': 'Average', }, 'ReturnData': True, }, ], StartTime=start_time, EndTime=end_time ) return response['MetricDataResults'][0]['Timestamps'], response['MetricDataResults'][0]['Values'] except Exception as e: print(f"Error getting CPU utilization for instance {instance_id}: {e}") return [], [] # Main plotting logic def plot_cpu_utilization(instances_list, lookback_days=last_n_days): end_time = datetime.utcnow() start_time = end_time - timedelta(days=lookback_days) # Filter running EC2 instances for instance in instances_list: if instance['State'] != 'running': continue timestamps, cpu_values = fetch_cpu_utilization(instance['InstanceId'], instance['Region'], start_time, end_time) # Check if data is available if timestamps: context.plot.add_trace( name=f"Instance {instance['InstanceId']}", xpts=timestamps, # x-axis points ypts=cpu_values, # y-axis points tracetype="line" ) # Set plot properties context.plot.xlabel = 'Date' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'CPU Utilization per EC2 Instance (Last {last_n_days} Days)' # Execute the plotting function plot_cpu_utilization(instances_list)
            copied
            2.2.3.2.3
        3. 2.2.3.3

          This task utilizes AWS Compute Optimizer to fetch rightsizing recommendations for AWS EC2 instances, aiming to optimize instance sizes based on actual usage. It assesses whether instances are under-utilized or over-utilized, suggesting adjustments to enhance performance and reduce costs. By querying across specified or all regions, it allows for a comprehensive optimization strategy, ensuring resources are efficiently allocated and maximizes cost-effectiveness and performance across your AWS environment.

          region_name_to_search_recommendations = None
          copied
          2.2.3.3
          1. 2.2.3.3.1

            This task retrieves AWS EC2 instance rightsizing recommendations using AWS Compute Optimizer, identifying cost-saving and performance-enhancing opportunities by analyzing usage patterns. It suggests optimal instance types or sizes, ensuring efficient resource utilization.

            import json import boto3 from botocore.exceptions import BotoCoreError, ClientError from datetime import datetime creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 clients #compute_optimizer_client = boto3.client('compute-optimizer', region_name='us-west-2') pricing_client = boto3.client('pricing',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') def datetime_converter(o): if isinstance(o, datetime): return o.__str__() def get_price_for_instance(instance_type, region): # Mapping AWS region to the Pricing API format region_name_map = { "us-east-1": "US East (N. Virginia)", "us-east-2": "US East (Ohio)", "us-west-1": "US West (N. California)", "us-west-2": "US West (Oregon)", "af-south-1": "Africa (Cape Town)", "ap-east-1": "Asia Pacific (Hong Kong)", "ap-south-1": "Asia Pacific (Mumbai)", "ap-northeast-3": "Asia Pacific (Osaka)", "ap-northeast-2": "Asia Pacific (Seoul)", "ap-southeast-1": "Asia Pacific (Singapore)", "ap-southeast-2": "Asia Pacific (Sydney)", "ap-northeast-1": "Asia Pacific (Tokyo)", "ca-central-1": "Canada (Central)", "eu-central-1": "EU (Frankfurt)", "eu-west-1": "EU (Ireland)", "eu-west-2": "EU (London)", "eu-south-1": "EU (Milan)", "eu-west-3": "EU (Paris)", "eu-north-1": "EU (Stockholm)", "me-south-1": "Middle East (Bahrain)", "sa-east-1": "South America (São Paulo)"} region_name = region_name_map.get(region, region) # Default to using the region code if no mapping found try: response = pricing_client.get_products( ServiceCode='AmazonEC2', Filters=[ {'Type': 'TERM_MATCH', 'Field': 'instanceType', 'Value': instance_type}, {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': region_name}, {'Type': 'TERM_MATCH', 'Field': 'preInstalledSw', 'Value': 'NA'}, {'Type': 'TERM_MATCH', 'Field': 'operatingSystem', 'Value': 'Linux'}, {'Type': 'TERM_MATCH', 'Field': 'tenancy', 'Value': 'shared'}, {'Type': 'TERM_MATCH', 'Field': 'capacitystatus', 'Value': 'Used'}, ], MaxResults=1 ) price_info = json.loads(response['PriceList'][0]) price_dimensions = next(iter(price_info['terms']['OnDemand'].values()))['priceDimensions'] price_per_unit = next(iter(price_dimensions.values()))['pricePerUnit']['USD'] return float(price_per_unit) except Exception as e: print(f"Error fetching price for {instance_type} in {region}: {e}") return None def get_ec2_rightsizing_recommendations(region_name_to_search_recommendations=None): regions_to_search = [] if region_name_to_search_recommendations: regions_to_search.append(region_name_to_search_recommendations) else: # Fetch all regions if none specified ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') all_regions_response = ec2_client.describe_regions() regions_to_search = [region['RegionName'] for region in all_regions_response['Regions']] all_recommendations = [] for region in regions_to_search: try: # Initialize compute-optimizer client with the proper region local_compute_optimizer_client = boto3.client('compute-optimizer', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) next_token = None #page_counter = 1 # To count the number of pages fetched while True: if next_token: response = local_compute_optimizer_client.get_ec2_instance_recommendations(NextToken=next_token) else: response = local_compute_optimizer_client.get_ec2_instance_recommendations() recommendations = response.get('instanceRecommendations', []) if recommendations: all_recommendations.extend(recommendations) #print(f"Fetched {len(recommendations)} recommendations for page {page_counter}.") # Pagination - Check if there's a next page of recommendations next_token = response.get('NextToken') if not next_token: break # Exit loop if there's no more data to fetch #page_counter += 1 except ClientError as error: print(f"Client error in region {region}: {error}") except BotoCoreError as error: print(f"BotoCore error in region {region}: {error}") return all_recommendations def process_recommendations(region_name_to_search_recommendations=None): # Fetch recommendations once, using the provided region or searching all regions. recommendations = get_ec2_rightsizing_recommendations(region_name_to_search_recommendations) # If no recommendations were found after searching, exit the function. if not recommendations: print("No recommendations found. Please check if the region is correct or if there are any permissions issues.") return data_for_plotting = [] # Iterate through the fetched recommendations for processing. for recommendation in recommendations: # Extract details from each recommendation as before... instance_id = recommendation['instanceArn'].split('/')[-1] instance_name = recommendation.get('instanceName', 'N/A') findings = recommendation.get('finding', 'N/A') finding_reasons = ", ".join(recommendation.get('findingReasonCodes', [])) instance_state = recommendation.get('instanceState', 'N/A') current_instance_type = recommendation.get('currentInstanceType', 'N/A') tags = json.dumps(recommendation.get('tags', []), default=datetime_converter) account_id = recommendation['instanceArn'].split(':')[4] region = recommendation['instanceArn'].split(':')[3] # Print details for each recommendation... print(f"Instance ID: {instance_id}") print(f"Instance Name: {instance_name}") print(f"Findings: {findings}") print(f"Finding Reasons: {finding_reasons}") print(f"Instance State: {instance_state}") print(f"Current Instance Type: {current_instance_type}") print(f"Tags: {tags}") print(f"Account ID: {account_id}") print(f"Region: {region}") print("-" * 50) for option in recommendation['recommendationOptions']: recommended_instance_type = option.get('instanceType') migration_effort = option.get('migrationEffort', 'N/A') savings_opportunity_percentage = option.get('savingsOpportunity', {}).get('savingsOpportunityPercentage', 'N/A') estimated_monthly_savings_value = option.get('savingsOpportunity', {}).get('estimatedMonthlySavings', {}).get('value', 'N/A') current_price = get_price_for_instance(current_instance_type, region) recommended_price = get_price_for_instance(recommended_instance_type, region) price_difference = "N/A" if current_price is None or recommended_price is None else current_price - recommended_price data_for_plotting.append({ "instance_id": instance_id, "instance_name": instance_name, "estimated_monthly_savings_value": estimated_monthly_savings_value }) print(f"\tRecommended Instance Type: {recommended_instance_type}") print(f"\tMigration Effort: {migration_effort}") print(f"\tSavings Opportunity (%): {savings_opportunity_percentage}") print(f"\tEstimated Monthly Savings: USD {estimated_monthly_savings_value}") print(f"\tCurrent Price: {current_price if current_price is not None else 'N/A'} USD per hour") print(f"\tRecommended Price: {recommended_price if recommended_price is not None else 'N/A'} USD per hour") print(f"\tPrice Difference: {price_difference} USD per hour") print("-" * 25) return data_for_plotting #region_name_to_search_recommendations = None data_for_plotting = process_recommendations(region_name_to_search_recommendations)
            copied
            2.2.3.3.1
          2. 2.2.3.3.2

            This task generates a bar chart visualizing AWS EC2 rightsizing savings, with instance names on the X-axis and different recommendations distinguished by instance ID and rank in the legend.

            # print(json.dumps(data_for_plotting,indent=4)) # Aggregate savings values for each instance, keeping track of both instance ID and name savings_by_instance = {} for entry in data_for_plotting: instance_id = entry["instance_id"] instance_name = entry["instance_name"] # Keep instance name for labeling purposes savings_value = entry["estimated_monthly_savings_value"] # Check if the instance ID is already a key in the dictionary if instance_id not in savings_by_instance: savings_by_instance[instance_id] = {'name': instance_name, 'savings': [savings_value]} else: savings_by_instance[instance_id]['savings'].append(savings_value) # Plotting context.plot.xlabel = "Instance Name" context.plot.ylabel = "Estimated Monthly Savings ($)" context.plot.title = "Estimated Monthly Savings by Instance" # Add a trace for each instance's savings values for instance_id, info in savings_by_instance.items(): instance_name = info['name'] # Retrieve instance name for labeling savings_values = info['savings'] for i, savings_value in enumerate(savings_values): trace_name = f"({instance_id})-Rec{i+1}" context.plot.add_trace(name=trace_name, xpts=[instance_name], ypts=[savings_value], tracetype='bar')
            copied
            2.2.3.3.2
      4. 2.2.4

        2.2.4
        1. 2.2.4.1

          This runbook aids in enhancing storage efficiency by providing insights based on usage analysis. This service suggests optimal EBS volume configurations, including type, size, and IOPS, to align with performance needs and cost savings.

          region_name_to_search_recommendations = None
          copied
          2.2.4.1
          1. 2.2.4.1.1

            This task involves analyzing EBS volume usage to offer configuration changes for cost efficiency and performance improvement, based on historical data analysis.

            import boto3 import json from datetime import datetime, timezone creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Define the pricing_client at the beginning of your script to ensure it's available globally pricing_client = boto3.client('pricing', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') def get_ebs_volume_recommendations(region_name=None): """ Fetch EBS volume recommendations from AWS Compute Optimizer for a specific region or all regions. """ if region_name: regions = [region_name] else: # Initialize a client for the EC2 service to fetch all regions ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') regions_response = ec2_client.describe_regions() regions = [region['RegionName'] for region in regions_response['Regions']] recommendations = [] for region in regions: try: # Initialize Compute Optimizer client for each region compute_optimizer_client = boto3.client('compute-optimizer', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) next_token = None while True: params = {} if next_token: params['NextToken'] = next_token response = compute_optimizer_client.get_ebs_volume_recommendations(**params) recommendations.extend(response.get('volumeRecommendations', [])) next_token = response.get('NextToken', None) if not next_token: break except Exception as e: print(f"Error fetching EBS volume recommendations for region {region}: {e}") return recommendations def get_ebs_price(volume_type, size_gb, region): # Mapping AWS region to the Pricing API format region_name_map = { "us-east-1": "US East (N. Virginia)", "us-east-2": "US East (Ohio)", "us-west-1": "US West (N. California)", "us-west-2": "US West (Oregon)", "af-south-1": "Africa (Cape Town)", "ap-east-1": "Asia Pacific (Hong Kong)", "ap-south-1": "Asia Pacific (Mumbai)", "ap-northeast-3": "Asia Pacific (Osaka)", "ap-northeast-2": "Asia Pacific (Seoul)", "ap-southeast-1": "Asia Pacific (Singapore)", "ap-southeast-2": "Asia Pacific (Sydney)", "ap-northeast-1": "Asia Pacific (Tokyo)", "ca-central-1": "Canada (Central)", "eu-central-1": "EU (Frankfurt)", "eu-west-1": "EU (Ireland)", "eu-west-2": "EU (London)", "eu-south-1": "EU (Milan)", "eu-west-3": "EU (Paris)", "eu-north-1": "EU (Stockholm)", "me-south-1": "Middle East (Bahrain)", "sa-east-1": "South America (São Paulo)"} region_name = region_name_map.get(region, region) #print(f"searching for region {region_name}") # for debugging try: price_response = pricing_client.get_products( ServiceCode='AmazonEC2', Filters=[ {'Type': 'TERM_MATCH', 'Field': 'volumeApiName', 'Value': volume_type}, # Adjusted to 'volumeApiName' {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': region_name}, {'Type': 'TERM_MATCH', 'Field': 'productFamily', 'Value': 'Storage'} ], MaxResults=1 # Increased MaxResults to ensure broader search results ) #print(price_response) # for debugging # Ensure there's at least one price listed if price_response['PriceList']: # Assuming the first price item's details are representative price_data = json.loads(price_response['PriceList'][0]) terms = price_data.get('terms', {}).get('OnDemand', {}) if terms: price_dimensions = next(iter(terms.values()))['priceDimensions'] price_per_gb = next(iter(price_dimensions.values()))['pricePerUnit']['USD'] # Calculate total price based on the volume size total_price = float(price_per_gb) * size_gb return total_price else: print("No pricing terms found.") return None except Exception as e: print(f"Error fetching price for EBS volume: {e}") return None def process_recommendations(recommendations): for recommendation in recommendations: volume_arn = recommendation['volumeArn'] region = volume_arn.split(':')[3] # for pricing api query current_configuration = recommendation['currentConfiguration'] finding = recommendation['finding'] finding_reasons_codes = recommendation.get('findingReasonCodes', []) print(f"Volume ARN: {volume_arn}") print(f"Region: {region}") print(f"Current Configuration: {json.dumps(current_configuration, indent=2)}") print(f"Finding: {finding} {' | '.join(finding_reasons_codes) if finding_reasons_codes else ''}") if 'volumeRecommendationOptions' in recommendation: for option in recommendation['volumeRecommendationOptions']: configuration = option['configuration'] performance_risk = option.get('performanceRisk', 'N/A') rank = option['rank'] volume_type = configuration['volumeType'] size_gb = configuration['volumeSize'] current_price = get_ebs_price(current_configuration['volumeType'], current_configuration['volumeSize'], region) recommended_price = get_ebs_price(volume_type, size_gb, region) print(f"\tRecommended Configuration: {json.dumps(configuration, indent=4)}") print(f"\tPerformance Risk: {performance_risk}") print(f"\tRank: {rank}") print(f"\tCurrent Price: ${current_price} per month") print(f"\tRecommended Price: ${recommended_price} per month") # Calculate and print savings if current_price and recommended_price: savings = current_price - recommended_price print(f"\tEstimated Monthly Savings: ${savings:.2f}") print("-" * 60) else: print("\tNo recommendation options provided.") print("-" * 60) # Example usage #region_name_to_search_recommendations = 'us-east-1' # Set to None for all regions recommendations = get_ebs_volume_recommendations(region_name_to_search_recommendations) if recommendations: print("Processing Recommendations") process_recommendations(recommendations) else: print("No EBS volume recommendations available.")
            copied
            2.2.4.1.1
      5. 2.2.5

        2.2.5
        1. 2.2.5.1

          This task aggregates CPU utilization data for running RDS instances across an AWS account, computes the average CPU usage over a specified period, and plots the average to help in assessing overall resource efficiency.

          import boto3 from datetime import datetime, timedelta region_name=None # None when you want to run the script for all regions #last_n_days = 30 # AWS Credentials - replace with your method to retrieve AWS credentials creds = _get_creds(cred_label)['creds'] # Placeholder function access_key = creds['username'] secret_key = creds['password'] def get_aws_regions(): """Get a list of all AWS regions.""" ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = ec2.describe_regions() return [region['RegionName'] for region in regions['Regions']] def fetch_rds_instances(region): """Fetch all RDS instances in a specific region.""" rds = boto3.client('rds',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) paginator = rds.get_paginator('describe_db_instances') page_iterator = paginator.paginate() rds_instances = [] for page in page_iterator: for instance in page['DBInstances']: rds_instances.append({ 'DBInstanceIdentifier': instance['DBInstanceIdentifier'], 'Region': region, }) return rds_instances def fetch_cpu_utilization(db_instance_identifier, region, start_time, end_time): """Fetch the average CPU utilization for an RDS instance.""" cloudwatch = boto3.client('cloudwatch',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) metrics = cloudwatch.get_metric_statistics( Namespace='AWS/RDS', MetricName='CPUUtilization', Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': db_instance_identifier}], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Average'] ) data_points = metrics.get('Datapoints', []) if data_points: avg_cpu = sum(dp['Average'] for dp in data_points) / len(data_points) else: avg_cpu = 0 return avg_cpu def plot_cpu_utilization(region_name=None, last_n_days=7): """Plot CPU utilization for RDS instances.""" start_time = datetime.utcnow() - timedelta(days=last_n_days) end_time = datetime.utcnow() regions = [region_name] if region_name else get_aws_regions() for region in regions: rds_instances = fetch_rds_instances(region) avg_utilizations = [] for instance in rds_instances: avg_cpu = fetch_cpu_utilization(instance['DBInstanceIdentifier'], region, start_time, end_time) avg_utilizations.append((instance['DBInstanceIdentifier'], avg_cpu)) avg_utilizations.sort(key=lambda x: x[1], reverse=True) top_instances = avg_utilizations[:3] bottom_instances = avg_utilizations[-3:] instance_ids = [x[0] for x in top_instances + bottom_instances] utilizations = [x[1] for x in top_instances + bottom_instances] # Plotting context.plot.add_trace( name="CPU Utilization", xpts=instance_ids, ypts=utilizations, tracetype='bar' ) context.plot.xlabel = 'Instance ID' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'Top & Bottom 3 RDS Instances by CPU Utilization (Last {last_n_days} Days)' # Example usage plot_cpu_utilization(region_name, last_n_days) # For all regions # plot_cpu_utilization(region_name='us-east-1', last_n_days=30) # For a specific region
          copied
          2.2.5.1
        2. 2.2.5.2

          This task involves comparing daily cost of all RDS Instances in an AWS Account, helping organizations choose the most economical options for their database needs while optimizing their AWS budget

          import pandas as pd from datetime import datetime, timedelta, timezone # Assuming df is your DataFrame and last_n_days is defined # Example: last_n_days = 30 if df is not None: #print("DataFrame Columns:", list(df.columns)) # Convert 'lineItem/UsageStartDate' to datetime and ensure it's in UTC df['lineItem/UsageStartDate'] = pd.to_datetime(df['lineItem/UsageStartDate'], utc=True) # Filter out negative costs df = df[df['lineItem/UnblendedCost'] >= 0] # Adjust the cutoff date to the start of the day in UTC cutoff_date = (datetime.utcnow() - timedelta(days=last_n_days)).replace(hour=0, minute=0, second=0, microsecond=0, tzinfo=timezone.utc) rds_df = df[(df['lineItem/UsageStartDate'] > cutoff_date) & (df['product/ProductName'] == 'Amazon Relational Database Service') & (df['product/instanceType'].notna())] '''# First full day after cutoff first_full_day_after_cutoff = cutoff_date + timedelta(days=1) rds_df = rds_df[rds_df['lineItem/UsageStartDate'] >= first_full_day_after_cutoff]''' if not rds_df.empty: # Group by 'lineItem/UsageStartDate' and 'product/instanceType' for RDS rds_result = rds_df.groupby([rds_df['lineItem/UsageStartDate'].dt.date, 'product/instanceType']).agg( usage_hours=('lineItem/UsageAmount', 'sum'), usage_cost=('lineItem/UnblendedCost', 'sum') ).reset_index() #print("Number of rows in result DataFrame:", len(rds_result)) # Plotting the RDS cost data context.plot.xlabel = 'Date' context.plot.ylabel = 'RDS Usage Cost($)' context.plot.title = f'RDS Cost Usage (Last {last_n_days} Days)' for instance_type in rds_result['product/instanceType'].unique(): instance_data = rds_result[rds_result['product/instanceType'] == instance_type] x = instance_data['lineItem/UsageStartDate'].tolist() y = instance_data['usage_cost'].tolist() #print(f"Instance Type: {instance_type}") #print(f"Sample Dates (x): {x[:5]}") #print(f"Sample Costs (y): {y[:5]}") context.plot.add_trace(name=f"RDS- {instance_type}", xpts=x, ypts=y, tracetype="line") else: print("No data available for RDS in the specified time frame.") else: print("DataFrame is empty. Exiting.") #context.proceed=False
          copied
          2.2.5.2
      6. 2.2.6

        This task offers a concise view of AWS expenses by individual account IDs, aggregating unblended costs for each account. It highlights the spending patterns of various accounts, pinpointing those with the highest costs. Such insights are invaluable for organizations with multiple AWS accounts, aiding in efficient budget allocation and cost management by identifying potential overspends or resource underutilization.

        import pandas as pd if df is not None: print("Analyzing and visualizing cost by AWS Account ID...") # Filter out negative values filter_condition = (df['lineItem/UnblendedCost'] >= 0) filtered_df = df[filter_condition] # Group by 'lineItem/UsageAccountId', sum 'lineItem/UnblendedCost', and sort values account_costs = filtered_df.groupby('lineItem/UsageAccountId')['lineItem/UnblendedCost'].sum().sort_values(ascending=False) # Extract x and y values x = account_costs.index.astype(str).tolist() # Convert account IDs to strings and then get the list y = account_costs.values.tolist() # This gets the corresponding costs if len(x) == 1: print("Only one account found. Skipping chart.") else: # Set the properties for your plot context.plot.xlabel = 'AWS Account ID' context.plot.ylabel = 'Cost ($)' context.plot.title = 'Cost by AWS Account ID' # Add the trace to your plot context.plot.add_trace(name="Cost by AWS Account ID", xpts=x, ypts=y, tracetype="pie") print("Analysis complete.")
        copied
        2.2.6
  3. 3

    import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_aws_regions(): # Create an EC2 client ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') # Call describe_regions to get a list of all regions regions = ec2_client.describe_regions() # Extract region names region_names = [region['RegionName'] for region in regions['Regions']] return region_names regions = get_aws_regions() ''' print("Available AWS Regions:") for region in regions: print(region) ''' #context.skip_sub_tasks=True
    copied
    3
    1. 3.1

      AWS Route53 is a scalable and highly available DNS service that connects user requests to infrastructure running in AWS and outside. One of its features is health checks, which monitor the health of your resources. Over time, as resources are added or removed, or configurations change, some health checks may no longer be associated with any active resources, leading to unnecessary costs and potential confusion. This runbook identifies and removes these orphaned health checks which helps in optimizing costs, reducing clutter, and ensuring that only relevant health checks are active in your AWS account.

      3.1
      1. 3.1.1

        This task retrieves a list of all health checks that have been configured in Amazon's Route53 service. AWS Route53 is a scalable and highly available domain name system (DNS) web service. A health check in Route53 monitors the health and performance of your web applications, web servers, and other resources. By fetching all health checks, users can review, manage, or diagnose the operational status and configuration of their resources, ensuring that the routing policies are working as expected. This can be especially useful for maintaining high availability and redundancy in distributed systems or for troubleshooting issues related to DNS routing.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 client for Amazon Route53 route53 = boto3.client('route53',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def get_all_healthchecks(): """ Retrieve all health checks from Route53. Returns: - list: List of health check IDs. """ healthchecks = [] try: # Using paginator to handle potential pagination of results paginator = route53.get_paginator('list_health_checks') for page in paginator.paginate(): for healthcheck in page['HealthChecks']: healthchecks.append(healthcheck['Id']) except route53.exceptions.Route53ServiceError as e: print(f"Route53 service error fetching health checks: {e}") except Exception as e: print(f"Error fetching health checks: {e}") finally: return healthchecks #Main Block print("Fetching all health checks...") all_healthchecks = get_all_healthchecks() print(f"Found {len(all_healthchecks)} health checks.") for hc in all_healthchecks: print(hc)
        copied
        3.1.1
      2. 3.1.2

        AWS Route53, Amazon's DNS service, offers health checks to monitor and report the availability of specific resources. Over time, with changes in configurations, deployments, or scaling activities, some of these health checks might become redundant, as they are no longer associated with active resources. Filtering out these unused health checks is an essential maintenance activity. By doing so, users can identify and potentially remove extraneous checks, helping streamline the management of their DNS configurations, optimize costs, and maintain a cleaner, more efficient environment.

        import boto3 import sys creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 client for Amazon Route53 route53 = boto3.client('route53',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def get_all_resource_record_sets(hosted_zone_id): """ Retrieve all resource record sets for a hosted zone. Returns: - list: List of resource record sets. """ records = [] try: # Using paginator to handle potential pagination of results paginator = route53.get_paginator('list_resource_record_sets') for page in paginator.paginate(HostedZoneId=hosted_zone_id): records.extend(page['ResourceRecordSets']) except route53.exceptions.NoSuchHostedZone as e: print(f"Specified hosted zone {hosted_zone_id} does not exist: {e}") except Exception as e: print(f"Error fetching resource record sets for hosted zone {hosted_zone_id}: {e}") return records # Here, unused health check is a health check which is not associated to any resource record in the hosted zones def filter_unused_healthchecks(hosted_zones, all_healthchecks_s): """ Filter out health checks that are in use. Parameters: - hosted_zones (list): List of hosted zones. - all_healthchecks (list): List of all health checks. Returns: - list: List of unused health check IDs. """ # Initialize an empty set to store health checks that are in use used_healthchecks = set() # Iterate through each hosted zone for hosted_zone in hosted_zones: try: # Fetch resource record sets for the current hosted zone for record in get_all_resource_record_sets(hosted_zone['Id']): # If a health check is associated with the record, add it to the set of used health checks if 'HealthCheckId' in record: used_healthchecks.add(record['HealthCheckId']) except Exception as e: print(f"Error processing hosted zone {hosted_zone['Id']}: {e}") # Return the set of health checks that are not in use return list(set(all_healthchecks_s) - used_healthchecks) # Main block # Fetch all hosted zones print("Fetching all hosted zones...") try: hosted_zones = route53.list_hosted_zones()['HostedZones'] print(f"Found {len(hosted_zones)} hosted zones.") except Exception as e: print(f"Error fetching hosted zones: {e}") #sys.exit(1) # Exit the script with an error code # all_healthchecks = [] #for testing otherwise initialized passed down from parent task # all_healthchecks passed down from parent task if all_healthchecks: unused_healthchecks = filter_unused_healthchecks(hosted_zones, all_healthchecks) # Ensure that unused_healthchecks is a list, even if empty unused_healthchecks = unused_healthchecks if unused_healthchecks else [] # Print the unused health checks if unused_healthchecks: print("Unused health checks found:") for hc in unused_healthchecks: print(hc) else: print("No unused health checks found.") else: print("Zero Route 53 Health checks were found") context.skip_sub_tasks = True
        copied
        3.1.2
        1. 3.1.2.1

          AWS Route53 is Amazon's DNS web service, and it provides health checks to monitor the health of resources and applications. Over time, as configurations change or resources are decommissioned, certain health checks might no longer be relevant or needed. Deleting these unnecessary Route53 health checks helps in decluttering the AWS environment, reducing potential costs, and simplifying management. It's essential to periodically review and delete any health checks that are no longer in use to maintain an optimized and streamlined AWS setup.

          import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 client for Amazon Route53 route53 = boto3.client('route53',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def delete_healthcheck(healthcheck_id): """ Delete a specific health check. Parameters: - healthcheck_id (str): The ID of the health check to delete. """ try: route53.delete_health_check(HealthCheckId=healthcheck_id) print(f"Successfully deleted health check: {healthcheck_id}") except route53.exceptions.NoSuchHealthCheck: print(f"Health check {healthcheck_id} does not exist.") except route53.exceptions.HealthCheckInUse: print(f"Health check {healthcheck_id} is still in use and cannot be deleted.") except Exception as e: print(f"Error deleting health check {healthcheck_id}: {e}") def process_health_checks(unused_healthchecks_list): """ Process and delete the provided health checks. Parameters: - unused_healthchecks_list (list): List of health check IDs to delete. """ # Ensure that unused_healthchecks_list is a list, even if empty unused_healthchecks_list = unused_healthchecks_list if unused_healthchecks_list else [] if unused_healthchecks_list: # Delete each unused health check print("Deleting unused health checks...") for healthcheck_id in unused_healthchecks_list: delete_healthcheck(healthcheck_id) else: print("No unused health checks...") # Main Block ''' # List of unused health checks to delete. # This should be updated based on the output from the previous script. # Example list type -> unused_healthchecks = ['d7d64110-9aa9-4cb2-a63b-9f33d96dd2d2'] # Replace with actual IDs if using the task in a standalone manner and not taking any inputs from parent task ''' # If the unused_healthchecks variable is not defined (e.g., it's not passed from a parent task), initialize it as an empty list. try: unused_healthchecks except NameError: unused_healthchecks = [] # Process (delete) the unused health checks process_health_checks(unused_healthchecks)
          copied
          3.1.2.1
    2. 3.2

      This runbook is designed to identify unattached Elastic IPs within all AWS regions and release them. Elastic IPs that are not associated with any instances can accumulate over time and result in unnecessary costs. By using this runbook, you can efficiently release these unattached Elastic IPs, optimizing your resources and reducing expenses.

      3.2
      1. 3.2.1

        This step involves searching through regions to identify Elastic IPs that are not currently associated with any instances. By iteratively querying each region's EC2 service, the script collects details about unattached Elastic IPs, including their public IP addresses, allocation IDs, and regions.

        import boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def find_unattached_elastic_ips(regions): unattached_ips = [] # Loop through each region for region in regions: try: # Fetch the list of Elastic IPs for the region ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) response = ec2_client.describe_addresses() except botocore.exceptions.BotoCoreError as e: print(f"Error fetching addresses in region {region}: {e}") continue # Check each Elastic IP for an association for eip in response.get('Addresses', []): if 'AssociationId' not in eip: # Add unattached Elastic IPs to the list unattached_ips.append({'public_ip': eip.get('PublicIp', ''), 'allocation_id': eip.get('AllocationId', ''), 'region': region}) return unattached_ips # List of regions to search for unattached Elastic IPs #regions = ['us-east-1'] # Add your desired regions here # Find unattached Elastic IPs unattached_ips = find_unattached_elastic_ips(regions) if len(unattached_ips) == 0: print("No unattached Elastic IPs found.") else: print("Unattached Elastic IPs:") # Print details for each unattached Elastic IP for ip_info in unattached_ips: print(f"Public IP: {ip_info['public_ip']}, Allocation ID: {ip_info['allocation_id']}, Region: {ip_info['region']}") context.skip_sub_tasks = True
        copied
        3.2.1
        1. 3.2.1.1

          Once the unattached Elastic IPs are identified, this task releases them from the AWS environment. By using the collected allocation IDs and regions, the script communicates with the EC2 service to release each unattached Elastic IP. This process ensures that these IPs are no longer reserved, contributing to cost savings and resource optimization.

          import boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Function to release an Elastic IP def release_elastic_ip(ec2_client, allocation_id, region): ec2 = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) try: # Attempt to release the Elastic IP response = ec2.release_address(AllocationId=allocation_id) return f"Released Elastic IP {allocation_id} in region {region}" except botocore.exceptions.BotoCoreError as e: return f"Error releasing Elastic IP {allocation_id} in region {region}: {e}" # List of regions to search for unattached Elastic IPs #regions = ["us-east-1"] # Add your desired regions here if len(unattached_ips) == 0: print("No unattached Elastic IPs found.") else: print("Unattached Elastic IPs:") # Print details for each unattached Elastic IP for ip_info in unattached_ips: print(f"Public IP: {ip_info['public_ip']}, Allocation ID: {ip_info['allocation_id']}, Region: {ip_info['region']}") # Release unattached Elastic IPs for ip_info in unattached_ips: response = release_elastic_ip(boto3.client('ec2'), ip_info['allocation_id'], ip_info['region']) print(response) context.proceed = False
          copied
          3.2.1.1
        2. 3.2.1.2

          This task is a combination of listing all Elastic IPs and releasing them if they are not associated with any instance all the while looping through all the regions.

          Note:- Even though the script is a one time stop for finding unattached Elastic IPs in all regions it takes relatively higher time to complete the process than the script focused on specified regions as it loops through all the regions.

          import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Create an EC2 client instance ec2 = boto3.client("ec2",aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') # Dictionary to store unused Elastic IPs with their allocation IDs and regions unused_ips = {} #Loop through all regions for region in ec2.describe_regions()["Regions"]: region_name = region["RegionName"] try: # Create an EC2 client for the specific region ec2conn = boto3.client("ec2",aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) # Retrieve all addresses (Elastic IPs) addresses = ec2conn.describe_addresses( Filters=[{"Name": "domain", "Values": ["vpc"]}] )["Addresses"] # Iterate through each address for address in addresses: # Check if the address is not associated with any instance if ( "AssociationId" not in address and address["AllocationId"] not in unused_ips ): # Store the unused Elastic IP's allocation ID and region unused_ips[address["AllocationId"]] = region_name # Release the unused Elastic IP ec2conn.release_address(AllocationId=address["AllocationId"]) print( f"Deleted unused Elastic IP {address['PublicIp']} in region {region_name}" ) except Exception as e: # Handle cases where there's no access to a specific region print(f"No access to region {region_name}: {e}") # Print the summary of deleted unused Elastic IPs print(f"Found and deleted {len(unused_ips)} unused Elastic IPs across all regions:") print(unused_ips)
          copied
          3.2.1.2
    3. 3.3

      This runbook involves identifying Amazon RDS instances that consistently exhibit low CPU usage over a specific time frame and then safely removing them. By leveraging Amazon CloudWatch metrics, organizations can pinpoint underutilized RDS instances, set a CPU utilization threshold, and analyze instances that fall below this mark. Before initiating deletion, it's crucial to disable any active 'Deletion Protection' and to create a final snapshot as a backup measure. This proactive approach not only ensures cost efficiency by eliminating unnecessary expenses but also optimizes resource management within AWS.

      3.3
      1. 3.3.1

        This task involves enumerating and displaying all AWS RDS (Amazon Relational Database Service) instances within an AWS account. This task is essential for management and auditing purposes, providing a clear view of all RDS instances. During this process, the script communicates with AWS services to retrieve information about each RDS instance, including their identifiers, status, and any other relevant details. This information is crucial for administrators to understand their AWS infrastructure's state, aiding in further actions like modification, deletion, or analysis of the RDS instances.

        import boto3 from botocore.exceptions import BotoCoreError, ClientError reg=None # Runs for specific region if provided otherwise runs for all regions if None/nothing is provided. creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_rds_instances(region=None): try: if region: regions = [region] else: # If region is None, list instances in all available regions ec2 = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] all_instances = [] for region in regions: print(f"Listing RDS instances in region {region}:") client = boto3.client('rds', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) try: db_instances = client.describe_db_instances() region_instances = [instance['DBInstanceIdentifier'] for instance in db_instances['DBInstances']] if region_instances: print(f"Found {len(region_instances)} RDS instances in region {region}:") for instance in region_instances: print(instance) all_instances.append({"region": region, "instance": instance}) else: print(f"No RDS instances found in region {region}.") except ClientError as e: print(f"Client error in region {region}: {e}") except BotoCoreError as e: print(f"BotoCoreError in region {region}: {e}") except Exception as e: print(f"Unexpected error in region {region}: {e}") return all_instances except Exception as e: print(f"Unexpected error: {e}") instances = list_all_rds_instances(reg) # reg is initialized in input parameters. Runs for specific region if provided otherwise runs for all regions if None/nothing is provided. #print(instances)
        copied
        3.3.1
      2. 3.3.2

        This task identifies Amazon RDS instances that are underperforming or underutilized in terms of CPU usage. By utilizing Amazon CloudWatch metrics, users can monitor and assess the CPU performance of their RDS instances over a specified period. By setting a CPU utilization threshold, they can filter out instances that consistently operate below this limit, indicating potential over-provisioning or underuse. Highlighting these low-utilization instances aids organizations in optimizing their AWS resource allocation, ensuring cost efficiency and facilitating informed decisions about scaling or decommissioning certain database resources.

        import boto3 from datetime import datetime, timedelta from botocore.exceptions import BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Constants # Define the threshold for CPU utilization(Defined in input parameters). # Instances below this will be considered low utilization. LOW_CPU_THRESHOLD=20 # Hardcoded for One time Filter Result LOOKBACK_PERIOD_HOURS=24 # Hardcoded for One time Filter Result LOOKBACK_PERIOD = 3600 * int(LOOKBACK_PERIOD_HOURS) # Define the period to check. Here, it's set to 24 hours. def get_low_cpu_rds_instances(instances): low_cpu_instances = [] # List to store the IDs of RDS instances with low CPU utilization # Group instances by region instances_by_region = {} for instance_info in instances: region = instance_info['region'] if region not in instances_by_region: instances_by_region[region] = [] instances_by_region[region].append(instance_info['instance']) # Check each region for region, instance_list in instances_by_region.items(): cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Loop through each RDS instance for instance_id in instance_list: try: end_time = datetime.utcnow() start_time = end_time - timedelta(seconds=LOOKBACK_PERIOD) metrics = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/RDS', 'MetricName': 'CPUUtilization', 'Dimensions': [{ 'Name': 'DBInstanceIdentifier', 'Value': instance_id }] }, 'Period': LOOKBACK_PERIOD, 'Stat': 'Average' }, 'ReturnData': True, }, ], StartTime=start_time, EndTime=end_time ) if metrics['MetricDataResults'][0]['Values']: cpu_utilization = metrics['MetricDataResults'][0]['Values'][0] if cpu_utilization < int(LOW_CPU_THRESHOLD): low_cpu_instances.append({ 'InstanceID': instance_id, 'AverageCPU': cpu_utilization, 'Region': region }) except Exception as e: print(f"Error fetching CloudWatch metrics for RDS instance {instance_id} in {region}: {e}") return low_cpu_instances low_cpu_rds = get_low_cpu_rds_instances(instances) # Print the results if low_cpu_rds: print("RDS instances with low CPU utilization:") for instance_info in low_cpu_rds: print(f"Instance ID: {instance_info['InstanceID']}, Average CPU: {instance_info['AverageCPU']}% in Region: {instance_info['Region']}") else: print("No RDS instances with low CPU utilization found.") context.skip_sub_tasks=True
        copied
        3.3.2
        1. 3.3.2.1

          This task terminates the AWS RDS database and deletes all associated data. Before deletion, users often create a final snapshot to preserve the database's current state, enabling future restoration if necessary. It's essential to ensure that the "Deletion Protection" feature, designed to prevent accidental deletions, is disabled before proceeding. Once deleted, the RDS instance is no longer operational, and associated costs cease. However, any retained backups or snapshots will persist and may incur storage charges until they too are deleted.

          import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_rds_instance(instance_info): """ Delete an RDS instance after taking necessary precautions like disabling deletion protection and creating a final snapshot. Parameters: - instance_info (dict): Dictionary containing InstanceID and Region. """ instance_id = instance_info['InstanceID'] region = instance_info['Region'] # Initialize the boto3 client for the Amazon Relational Database Service (RDS) in the specified region rds = boto3.client('rds', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) try: instance_details = rds.describe_db_instances(DBInstanceIdentifier=instance_id) if instance_details['DBInstances'][0].get('DeletionProtection', False): rds.modify_db_instance(DBInstanceIdentifier=instance_id, DeletionProtection=False) print(f"Deletion protection disabled for {instance_id}") except rds.exceptions.DBInstanceNotFoundFault: print(f"RDS instance {instance_id} not found.") return except Exception as e: print(f"Error modifying RDS instance {instance_id}: {e}") return try: snapshot_name = f"final-snapshot-{instance_id}" rds.create_db_snapshot(DBInstanceIdentifier=instance_id, DBSnapshotIdentifier=snapshot_name) print(f"Final snapshot creation initiated for {instance_id}") waiter = rds.get_waiter('db_snapshot_completed') waiter.wait(DBSnapshotIdentifier=snapshot_name) print(f"Final snapshot {snapshot_name} created for {instance_id}") except rds.exceptions.SnapshotQuotaExceededFault: print(f"Snapshot quota exceeded for {instance_id}.") return except rds.exceptions.DBInstanceNotFoundFault: print(f"RDS instance {instance_id} not found.") return except Exception as e: print(f"Error creating snapshot for RDS instance {instance_id}: {e}") return try: rds.delete_db_instance(DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True) print(f"RDS instance {instance_id} deletion initiated") except rds.exceptions.DBInstanceNotFoundFault: print(f"RDS instance {instance_id} not found.") except Exception as e: print(f"Error deleting RDS instance {instance_id}: {e}") rds_instances_to_delete = low_cpu_rds # Make sure low_cpu_rds is a list of dictionaries with 'InstanceID' and 'Region' keys # Check if the list is empty if not rds_instances_to_delete: print("No RDS instances provided for deletion.") else: # Loop through each RDS instance in the list and call the delete function for instance_info in rds_instances_to_delete: delete_rds_instance(instance_info)
          copied
          3.3.2.1
    4. 3.4

      This runbook involves identifying Amazon Redshift clusters that have consistently low CPU utilization over a specified period and then deleting them to optimize resource usage and reduce costs. By monitoring the CPU metrics of Redshift clusters using Amazon CloudWatch, organizations can determine which clusters are underutilized and might be candidates for deletion or resizing. Deleting or resizing underutilized clusters ensures efficient use of resources and can lead to significant cost savings on cloud expenditures.

      3.4
      1. 3.4.1

        This process retrieves a list of all Amazon Redshift clusters within an AWS account. Amazon Redshift is a fully managed data warehouse service in the cloud that allows users to run complex analytic queries against petabytes of structured data. By fetching all Redshift clusters, users can gain insights into the number of active clusters, their configurations, statuses, and other related metadata. This information is crucial for administrative tasks, monitoring, and optimizing costs and performance.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_all_redshift_clusters(region=None): all_clusters = {} ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') regions_to_check = [region] if region else [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] for region in regions_to_check: # Initialize the Redshift client for the specified region redshift = boto3.client('redshift', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) clusters = [] try: # Using paginator to handle potential pagination of results paginator = redshift.get_paginator('describe_clusters') for page in paginator.paginate(): clusters.extend(page['Clusters']) if clusters: # Check if clusters list is not empty all_clusters[region] = clusters except Exception as e: print(f"Error fetching Redshift clusters in region {region}: {e}") return all_clusters # Set region to None for all regions, or specify a valid AWS region string for a specific region # Example: target_region = 'us-west-1' # Or None for all regions target_region = None # Get all Redshift clusters all_clusters = get_all_redshift_clusters(target_region) if all_clusters: print(f"Total Redshift Clusters: {sum(len(clusters) for clusters in all_clusters.values())}") for region, clusters in all_clusters.items(): print(f"In region {region}:") for cluster in clusters: print(f" - {cluster['ClusterIdentifier']}") else: print("No Redshift clusters found")
        copied
        3.4.1
      2. 3.4.2

        This task pertains to identifying Amazon Redshift clusters that exhibit consistently low CPU utilization over a predefined time span. By leveraging Amazon CloudWatch metrics, organizations can detect underutilized Redshift clusters. Recognizing such clusters provides valuable insights, allowing teams to make informed decisions about potential downscaling, resource reallocation, or other optimization measures to ensure efficient cloud resource usage.

        import boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Constants #LOW_CPU_THRESHOLD = 20 # Example Threshold for CPU utilization #LOOKBACK_PERIOD_HOURS = 24 # Example lookback period in hours LOOKBACK_PERIOD = 3600 * LOOKBACK_PERIOD_HOURS # Convert hours to seconds def get_low_cpu_redshift_clusters(all_clusters): """ Identify and list Redshift clusters with average CPU utilization below a defined threshold over a specific period. Parameters: - all_clusters (dict): Dictionary with region as keys and list of cluster info as values. Returns: - list: List of dictionaries containing Redshift cluster identifiers and their average CPU utilization. """ low_cpu_clusters = [] # List to store the cluster details with low CPU utilization for region, clusters in all_clusters.items(): # Initialize boto3 client for CloudWatch in the specific region cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) for cluster in clusters: cluster_id = cluster['ClusterIdentifier'] try: # Query CloudWatch to fetch the CPU utilization metric for the defined period metrics = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/Redshift', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'ClusterIdentifier', 'Value': cluster_id}] }, 'Period': LOOKBACK_PERIOD, 'Stat': 'Average' # We're interested in the average CPU utilization }, 'ReturnData': True, }, ], StartTime=datetime.utcnow() - timedelta(seconds=LOOKBACK_PERIOD), EndTime=datetime.utcnow() ) # Check if the cluster's CPU utilization falls below the threshold if metrics['MetricDataResults'][0]['Values']: cpu_utilization = metrics['MetricDataResults'][0]['Values'][0] if cpu_utilization < LOW_CPU_THRESHOLD: low_cpu_clusters.append({ 'Region': region, 'ClusterID': cluster_id, 'AverageCPU': cpu_utilization }) except Exception as e: print(f"Error checking CPU utilization for cluster {cluster_id} in region {region}: {e}") return low_cpu_clusters # Example usage (assuming all_clusters is provided from an upstream task) # all_clusters = { # 'us-west-2': [{'ClusterIdentifier': 'cluster1'}, {'ClusterIdentifier': 'cluster2'}], # 'us-east-1': [{'ClusterIdentifier': 'cluster3'}], # } clusters_info = get_low_cpu_redshift_clusters(all_clusters) # Print the results if clusters_info: print("Redshift clusters with low CPU utilization:") for cluster in clusters_info: print(f"Region: {cluster['Region']}, Cluster ID: {cluster['ClusterID']}, Average CPU: {cluster['AverageCPU']:.2f}%") else: print("No Redshift clusters with low CPU utilization found.") context.skip_sub_tasks=True
        copied
        3.4.2
        1. 3.4.2.1

          This task involves terminating specific Amazon Redshift clusters, effectively removing them from an AWS account. Deleting a Redshift cluster permanently erases all the data within the cluster and cannot be undone. This process might be undertaken to manage costs, decommission outdated data warehouses, or perform clean-up operations. It's crucial to ensure appropriate backups (snapshots) are in place before initiating a deletion to prevent accidental data loss.

          import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_redshift_cluster(cluster_id, region): """ Attempts to delete a specified Amazon Redshift cluster in a given region. Parameters: - cluster_id (str): The unique identifier of the Redshift cluster to be deleted. - region (str): The AWS region where the Redshift cluster is located. """ try: # Initialize the boto3 client for Amazon Redshift with the appropriate region redshift = boto3.client('redshift', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) # Initiate the deletion of the specified Redshift cluster. response = redshift.delete_cluster(ClusterIdentifier=cluster_id, SkipFinalClusterSnapshot=True) print(f"Redshift cluster {cluster_id} deletion initiated in region {region}.") except redshift.exceptions.ClusterNotFoundFault: print(f"Redshift cluster {cluster_id} not found in region {region}.") except redshift.exceptions.InvalidClusterStateFault: print(f"Redshift cluster {cluster_id} is in an invalid state for deletion in region {region}.") except Exception as e: print(f"Error deleting Redshift cluster {cluster_id} in region {region}: {e}") # Example usage #clusters_info = [{'Region': 'us-west-2', 'ClusterID': 'example-cluster-1'}, {'Region': 'us-east-1', 'ClusterID': 'example-cluster-2'}] clusters_to_delete = clusters_info # clusters_info passed down from parent task to delete said Redshift Clusters # Can replace clusters_info with a list of cluster_id to delete any Redshift Cluster using this task if clusters_to_delete: for cluster in clusters_to_delete: delete_redshift_cluster(cluster['ClusterID'], cluster['Region']) else: print("No Redshift Clusters provided for deletion")
          copied
          3.4.2.1
    5. 3.5

      This runbook is designed to identify and remove secrets from the AWS Secrets Manager that haven't been accessed or utilized within a defined period (e.g., the past 90 days). Secrets Manager is a tool that helps manage sensitive information like API keys or database credentials. Over time, some secrets may become obsolete or unused, occupying unnecessary space and potentially incurring extra costs. This task automates the cleanup process by scanning for these dormant secrets and safely deleting them. Before executing this runbook, ensure proper AWS IAM permissions are set. Always use caution when deleting secrets to avoid unintended disruptions to applications or services.

      REGION=None
      copied
      3.5
      1. 3.5.1

        This task retrieves a list of all secrets stored in the AWS Secrets Manager for your account. AWS Secrets Manager is a service designed to safeguard sensitive information such as database credentials and API keys. By executing this task, users will obtain a comprehensive list of secret names or ARNs, aiding in audit, management, or automation processes. Note that this task will list the secrets' identifiers, but not their actual values. To fetch a specific secret's value, additional steps involving the get_secret_value method are required. Ensure you have the appropriate AWS IAM permissions before executing this task.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_all_secrets(secrets_client): try: secrets = secrets_client.list_secrets() return [secret['Name'] for secret in secrets['SecretList']] except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'UnrecognizedClientException': print(f"Invalid security token or service not available in {secrets_client.meta.region_name}. Skipping.") else: print(f"ClientError {error_code} in {secrets_client.meta.region_name}: {e}") return [] except Exception as e: print(f"An unexpected error occurred in {secrets_client.meta.region_name}: {e}") return [] # Main block # Specify the region here. If None, it will loop through all available regions. #REGION = 'us-east-1' #print(f"regions received from top task {regions}") regions = [REGION] if REGION else regions all_secrets_data = [] for region in regions: try: secrets_client = boto3.client('secretsmanager',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) all_secrets = get_all_secrets(secrets_client) if all_secrets: print(f"All Secrets in {region}:") for secret in all_secrets: print(secret) all_secrets_data.append({'region': region, 'secret': secret}) else: print(f"No secrets found in {region}.") except Exception as e: print(f"An unexpected error occurred while processing {region}: {e}") # Print the all_secrets_data list to check the content print("\nAll Secrets Data:") for secret_data in all_secrets_data: print(f"Region: {secret_data['region']}, Secret: {secret_data['secret']}")
        copied
        3.5.1
      2. 3.5.2

        This task aims to pinpoint secrets within AWS Secrets Manager that haven't been accessed for a specified duration, such as the past 90 days. Over time, certain secrets may not be referenced or utilized, indicating they may no longer be needed. By identifying these inactive secrets, organizations can assess their continued relevance, streamline their secrets inventory, and enhance security by minimizing potential exposure points. Before taking any action based on the results, it's crucial to review the list and ensure no critical secrets are mistakenly categorized as "unused."

        import boto3 from datetime import datetime, timedelta from botocore.exceptions import ClientError UNUSED_DAYS_THRESHOLD=60 #Harcoded For One time result creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def find_unused_secrets(secrets_client, secret_name): try: secret_details = secrets_client.describe_secret(SecretId=secret_name) last_accessed_date = secret_details.get('LastAccessedDate') # Return True if the secret is either never accessed or last accessed > 90 days ago return not last_accessed_date or (datetime.now(last_accessed_date.tzinfo) - last_accessed_date > timedelta(days=UNUSED_DAYS_THRESHOLD)) except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'UnrecognizedClientException': print(f"Invalid security token or service not available. Skipping secret: {secret_name}.") else: print(f"ClientError {error_code} for secret {secret_name}: {e}") return False except Exception as e: print(f"An unexpected error occurred for secret {secret_name}: {e}") return False # Main block # Check if all_secrets_data is defined and is not None; if not, initialize as an empty list # all_secrets_data passed down from get_all_secrets task all_secrets = all_secrets_data if 'all_secrets_data' in locals() and all_secrets_data is not None else [] ''' # Sample data from the previous task all_secrets = [ {'region': 'us-east-1', 'secret': 'test/user'}, {'region': 'us-east-1', 'secret': 'test-unused/user'}, # ... add more secrets and regions as needed ] ''' all_unused_secrets = [] for secret_data in all_secrets: region = secret_data['region'] secret_name = secret_data['secret'] try: secrets_client = boto3.client('secretsmanager',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) if find_unused_secrets(secrets_client, secret_name): all_unused_secrets.append(secret_data) print(f"Secret {secret_name} in region {region} is unused.") else: print(f"Secret {secret_name} in region {region} is active.") except Exception as e: print(f"An unexpected error occurred while processing secret {secret_name} in region {region}: {e}") # Displaying the unused secrets list print("\nAll Unused Secrets Data:") for secret_data in all_unused_secrets: print(f"Region: {secret_data['region']}, Secret: {secret_data['secret']}") context.skip_sub_tasks=True
        copied
        3.5.2
        1. 3.5.2.1

          This task focuses on securely removing a specified secret from AWS Secrets Manager. Deleting secrets can be an essential step in managing sensitive information, especially if a secret is no longer in use or has been compromised. By executing this task, the targeted secret will be permanently erased from AWS Secrets Manager, ensuring it can't be accessed or retrieved. It's crucial to double-check the secret's relevance and backup any necessary data before deletion to prevent any unintended data loss or service disruptions.

          import boto3 from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_secret(secrets_client, secret_name): try: secrets_client.delete_secret(SecretId=secret_name) print(f"Deleted secret: {secret_name}") return True except ClientError as e: # Handle specific known errors if e.response['Error']['Code'] == 'ResourceNotFoundException': print(f"Secret {secret_name} not found. Skipping.") else: print(f"Error deleting secret {secret_name}: {e}") return False # To handle any other boto3 specific errors except (NoCredentialsError, PartialCredentialsError): print("Authentication error. Please check your AWS credentials.") return False # A catch-all for other exceptions which we may not anticipate except Exception as e: print(f"An unexpected error occurred: {e}") return False # Main block # Check if all_unused_secrets is defined and is not None; if not, initialize as an empty list # all_unused_secrets passed down from get_all_secrets task all_unused_secrets = all_unused_secrets if 'all_unused_secrets' in locals() and all_unused_secrets is not None else [] ''' # Sample data for testing purposes. This will be passed from the upstream task. all_unused_secrets = [ {'region': 'us-east-1', 'secret': 'sample_secret_1'}, {'region': 'us-east-2', 'secret': 'sample_secret_2'}, # ... add more secrets and regions as needed ] # Example data ''' if all_unused_secrets: for secret_data in all_unused_secrets: region = secret_data['region'] secret_name = secret_data['secret'] try: # Initialize the secrets client for the given region secrets_client = boto3.client('secretsmanager',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) # Attempt to delete the secret delete_secret(secrets_client, secret_name) except Exception as e: print(f"An unexpected error occurred in {region}: {e}") else: print("No secrets provided. Exiting.")
          copied
          3.5.2.1
    6. 3.6

      This runbook automates the process of identifying and deleting unused Amazon CloudWatch Log Streams. By scanning specified log groups across designated AWS regions, it efficiently detects log streams that have been inactive for a predetermined period. Once identified, these log streams are safely removed, helping organizations maintain a clutter-free logging environment and potentially reducing associated storage costs.

      3.6
      1. 3.6.1

        This task is designed to systematically retrieve and enumerate all CloudWatch log streams present in specified AWS regions. It offers a detailed snapshot of the existing log streams, enabling users to understand their logging landscape across various AWS services and applications.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_log_streams(region=None): log_streams_info = [] # To store log streams information for all regions # Function to list log streams for a given region def fetch_log_streams_for_region(client, specific_region): print(f"\nFetching log streams for region: {specific_region}...") log_groups = client.describe_log_groups() if not log_groups.get('logGroups'): print(f"No log groups or streams found in region: {specific_region}.") return for log_group in log_groups['logGroups']: log_group_name = log_group['logGroupName'] log_streams = client.describe_log_streams(logGroupName=log_group_name) for stream in log_streams.get('logStreams', []): print(f"Region: {specific_region}, Log Group: {log_group_name}, Log Stream: {stream['logStreamName']}") # Append the information to log_streams_info log_streams_info.append({ 'region': specific_region, 'log_group': log_group_name, 'log_stream': stream['logStreamName'] }) try: # If a region is provided, fetch log streams for that region if region: client = boto3.client('logs',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) fetch_log_streams_for_region(client, region) # If no region is provided, fetch log streams for all regions else: ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] for specific_region in regions: client = boto3.client('logs', region_name=specific_region) fetch_log_streams_for_region(client, specific_region) except boto3.exceptions.Boto3Error as e: print(f"An error occurred while accessing AWS: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return log_streams_info # Main block log_streams_data = list_all_log_streams(target_region) # Pass the name of region as a string to search for that specific region otherwise it runs for all regions print("\nCompleted fetching log streams data.") # Uncomment the line below if you want to see the returned data structure # print(log_streams_data)
        copied
        3.6.1
      2. 3.6.2

        This task examines CloudWatch log streams to identify those that have been inactive for a specified duration. By pinpointing these dormant streams, the task aids in maintaining a cleaner, more efficient logging environment and can subsequently assist in reducing unnecessary storage costs associated with retaining outdated logs on AWS CloudWatch.

        import boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def filter_unused_log_streams(all_log_streams, unused_days=30): unused_log_streams = [] for log_info in all_log_streams: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: log_stream = client.describe_log_streams( logGroupName=log_info['log_group'], logStreamNamePrefix=log_info['log_stream'] )['logStreams'][0] # We're using prefix, so getting the first result # Check if the log stream has a 'lastEventTimestamp' if 'lastEventTimestamp' in log_stream: last_event_date = datetime.utcfromtimestamp(log_stream['lastEventTimestamp'] / 1000) if last_event_date < datetime.utcnow() - timedelta(days=unused_days): unused_log_streams.append(log_info) except boto3.exceptions.Boto3Error as e: print(f"Error accessing log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}") except Exception as e: print(f"Unexpected error: {e}") return unused_log_streams # Main block # UNUSED_DAYS = 90 # all_log_streams to be passed down from parent task # Example structure, all_log_streams = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity all_log_streams = log_streams_data # Passed down from parent task unused_logs = filter_unused_log_streams(all_log_streams, UNUSED_DAYS) if unused_logs: print("\nFiltered unused log streams:") for log in unused_logs: print(f"Region: {log['region']}, Log Group: {log['log_group']}, Log Stream: {log['log_stream']}") # Uncomment the line below if you want to see the full list of unused log streams # print(unused_logs) else: print("No Unused Logs") context.skip_sub_tasks=True
        copied
        3.6.2
        1. 3.6.2.1

          This task allows users to remove specified log streams from AWS CloudWatch. By executing this task, organizations can effectively manage and declutter their logging space, ensuring that only relevant and necessary logs are retained. This not only optimizes the logging environment but also helps in potentially reducing storage-associated costs on AWS.

          import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_log_streams(unused_logs): """ Deletes the specified CloudWatch log streams. Args: unused_logs (list): List of dictionaries containing region, log group, and unused log stream information. Returns: list: List of dictionaries with the results of the deletion process. """ deletion_results = [] for log_info in unused_logs: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: # Delete the log stream client.delete_log_stream( logGroupName=log_info['log_group'], logStreamName=log_info['log_stream'] ) deletion_results.append({ 'status': 'success', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Successfully deleted log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}." }) except boto3.exceptions.Boto3Error as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Error deleting log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}" }) except Exception as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Unexpected error: {e}" }) return deletion_results # Main Block # unused_logs to be passed down from parent task # Example Structure, unused_logs = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity results = delete_log_streams(unused_logs) if not results: print("No log streams were deleted.") else: for result in results: print(result['message'])
          copied
          3.6.2.1
    7. 3.7

      This runbook identifies and removes inactive NAT gateways to optimize AWS costs. By eliminating unused resources, it streamlines infrastructure management and reduces unnecessary charges.

      region=None
      copied
      3.7
      1. 3.7.1

        This task involves using the boto3 to programmatically iterate over all AWS regions, retrieve, and list details of all Network Address Translation (NAT) gateways present in an AWS account.

        import boto3 from botocore.exceptions import ( BotoCoreError, ClientError, NoCredentialsError, PartialCredentialsError, EndpointConnectionError, ) creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_nat_gateways_for_region(ec2_client, region_name): nat_gateways_with_regions = [] try: response = ec2_client.describe_nat_gateways() if response and 'NatGateways' in response and len(response['NatGateways']) > 0: for nat_gateway in response['NatGateways']: nat_gateway_info = { "NatGatewayId": nat_gateway['NatGatewayId'], "Region": region_name, "State": nat_gateway['State'] } nat_gateways_with_regions.append(nat_gateway_info) print(nat_gateway_info) else: print(f"No NAT Gateways found in region {region_name}.") except (NoCredentialsError, PartialCredentialsError, EndpointConnectionError, ClientError, BotoCoreError, Exception) as e: print(f"Error in region {region_name}: {str(e)}") return nat_gateways_with_regions #region = 'us-east-1' # You can set this to None to check all regions all_nat_gateways = [] if region: ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) all_nat_gateways.extend(list_nat_gateways_for_region(ec2_client, region)) else: for region_name in regions: ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) all_nat_gateways.extend(list_nat_gateways_for_region(ec2_client, region_name))
        copied
        3.7.1
      2. 3.7.2

        This task identifies AWS NAT gateways that have not transferred any data in the past week or threshold, deeming them as "unused", and filters them out for potential optimization or deletion.

        import boto3 from datetime import datetime, timedelta from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] unused_days = 7 # Hardcoded for One time Result def check_unused_nat_gateways_for_region(nat_gateways_list): unused_nat_gateways = [] # Check if the list is empty or not if not nat_gateways_list: print("No NAT gateways received for processing.") return unused_nat_gateways print(f"Received {len(nat_gateways_list)} NAT gateways for processing.") for nat_gateway_info in nat_gateways_list: region_name = nat_gateway_info['Region'] nat_gateway_id = nat_gateway_info['NatGatewayId'] ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: response = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'm1', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/NATGateway', 'MetricName': 'BytesOutToDestination', 'Dimensions': [ { 'Name': 'NatGatewayId', 'Value': nat_gateway_info['NatGatewayId'] } ] }, 'Period': 86400 * unused_days, 'Stat': 'Sum' }, 'ReturnData': True } ], StartTime=datetime.now() - timedelta(days=unused_days), EndTime=datetime.now() ) if not response['MetricDataResults'][0]['Values']: unused_nat_gateways.append(nat_gateway_info) except (ClientError, BotoCoreError, Exception) as e: print(f"Error in region {region_name} for NAT Gateway {nat_gateway_id}: {str(e)}") # Print the total number of unused NAT gateways print(f"Out of {len(nat_gateways_list)} NAT gateways, {len(unused_nat_gateways)} are unused.") return unused_nat_gateways ''' all_nat_gateways = [ {'NatGatewayId': 'nat-0bc09626aff12105a', 'Region': 'us-east-1', 'State': 'pending'}, {'NatGatewayId': 'nat-0cee3df0c034c58f8', 'Region': 'us-east-1', 'State': 'deleted'}, {'NatGatewayId': 'nat-0b5177c47df82bc51', 'Region': 'us-east-1', 'State': 'deleted'} ] # passed down from previous task ''' unused_nat_gateways = check_unused_nat_gateways_for_region(all_nat_gateways) context.skip_sub_tasks=True
        copied
        3.7.2
        1. 3.7.2.1

          This task removes specified NAT gateways in an AWS environment. This cleanup optimizes network infrastructure, enhances security, and reduces costs by eliminating unused resources.

          import boto3 from botocore.exceptions import (ClientError,BotoCoreError) creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_nat_gateways(nat_gateway_list): for nat_gateway_info in nat_gateway_list: region_name = nat_gateway_info['Region'] nat_gateway_id = nat_gateway_info['NatGatewayId'] nat_gateway_state = nat_gateway_info['State'] ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) if nat_gateway_state == 'available': try: ec2_client.delete_nat_gateway(NatGatewayId=nat_gateway_id) print(f"Deleted NAT Gateway ID: {nat_gateway_id} in region {region_name}") except (ClientError, BotoCoreError, Exception) as e: print(f"Error deleting NAT Gateway {nat_gateway_id} in region {region_name}: {str(e)}") elif nat_gateway_state == 'pending': print(f"NAT Gateway ID: {nat_gateway_id} in region {region_name} is still in 'pending' state and cannot be deleted.") else: print(f"NAT Gateway ID: {nat_gateway_id} in region {region_name} is in '{nat_gateway_state}' state and was not deleted.") ''' unused_nat_gateways = [{'NatGatewayId': 'nat-0bc09626aff12105a', 'Region': 'us-east-1', 'State': 'available'}, {'NatGatewayId': 'nat-0cee3df0c034c58f8', 'Region': 'us-east-1', 'State': 'deleted'}, {'NatGatewayId': 'nat-0b5177c47df82bc51', 'Region': 'us-east-1', 'State': 'deleted'}] # passed down from previous task ''' if not unused_nat_gateways: print("No NAT gateways received for deletion.") else: delete_nat_gateways(unused_nat_gateways)
          copied
          3.7.2.1
    8. 3.8

      In an AWS environment, EC2 instances incur charges based on their uptime. However, not all instances are actively utilized, leading to unnecessary expenses. Underutilized instances may have low CPU usage, minimal network activity, or other metrics indicating limited activity. Identifying and stopping such instances can result in significant cost savings. Tools like AWS Cost Explorer and third-party solutions can help identify these instances based on CloudWatch metrics. This runbook automates the process of monitoring and taking action on underutilized instances based on low CPU usage ensuring an optimized and cost-effective cloud environment. It's crucial, though, to ensure that stopping these instances won't disrupt essential services or applications.

      CPU_THRESHOLD = 20 # Harcoded for one time result LOOKBACK_PERIOD_HOURS = 2 # Harcoded for one time result region_name=None
      copied
      3.8
      1. 3.8.1

        Amazon Elastic Compute Cloud (EC2) is a service offered by Amazon Web Services (AWS) that provides resizable compute capacity in the cloud. Through Boto3's EC2 client, the describe_instances() method provides detailed information about each instance, including its ID, type, launch time, and current state. This capability assists users in effectively monitoring and managing their cloud resources.

        import boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_regions(): ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name = 'us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def list_ec2_instances(region=None): # If no region is provided, fetch instances from all regions regions = [region] if region else list_all_regions() # Create an empty list to store instance details instance_details = [] for region in regions: # Try initializing the Boto3 EC2 client for the specific region try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except (NoCredentialsError, PartialCredentialsError): print(f"Failed for {region}: No AWS credentials found or incomplete credentials provided.") continue except BotoCoreError as e: print(f"Failed for {region}: Error initializing the EC2 client due to BotoCore Error: {e}") continue except Exception as e: print(f"Failed for {region}: Unexpected error initializing the EC2 client: {e}") continue #print(f"Fetching EC2 instance details for region: {region}...") # Try to paginate through the EC2 instance responses for the specific region try: paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page['Reservations']: for instance in reservation['Instances']: # Extract the desired attributes instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'] state = instance['State']['Name'] # Append the details to the list instance_details.append({ 'InstanceId': instance_id, 'InstanceType': instance_type, 'LaunchTime': launch_time, 'State': state, 'Region': region }) #print(f"Fetched all instance details for region: {region} successfully!") except ClientError as e: print(f"Failed for {region}: AWS Client Error while fetching EC2 instance details: {e}") except Exception as e: print(f"Failed for {region}: Unexpected error while fetching EC2 instance details: {e}") return instance_details # You can replace None with a specific region string like 'us-east-1' to get instances from a specific region instances_list = list_ec2_instances(region_name) if instances_list: print("\nEC2 Instance Details:") for instance in instances_list: print("-" * 50) # Separator line for key, value in instance.items(): print(f"{key}: {value}") else: print("No instances found or an error occurred.")
        copied
        3.8.1
      2. 3.8.2

        import boto3 from datetime import datetime, timedelta from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError, EndpointConnectionError, DataNotFoundError # AWS credentials creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('instances_list') is None: instances_list = [] # Function to fetch CPU utilization for a given instance def fetch_cpu_utilization(instance_id, region, start_time, end_time): try: cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) response = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}] }, 'Period': 3600, # one hour 'Stat': 'Average', }, 'ReturnData': True, }, ], StartTime=start_time, EndTime=end_time ) return response['MetricDataResults'][0]['Timestamps'], response['MetricDataResults'][0]['Values'] except Exception as e: print(f"Error getting CPU utilization for instance {instance_id}: {e}") return [], [] # Main plotting logic def plot_cpu_utilization(instances_list, lookback_days=7): end_time = datetime.utcnow() start_time = end_time - timedelta(days=lookback_days) for instance in instances_list: if instance['State'] != 'running': continue timestamps, cpu_values = fetch_cpu_utilization(instance['InstanceId'], instance['Region'], start_time, end_time) # Check if data is available if timestamps: context.plot.add_trace( name=f"Instance {instance['InstanceId']}", xpts=timestamps, # x-axis points ypts=cpu_values, # y-axis points tracetype="line" ) # Set plot properties context.plot.xlabel = 'Date' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'CPU Utilization per EC2 Instance (Last {lookback_days} Days)' # Execute the plotting function plot_cpu_utilization(instances_list)
        copied
        3.8.2
      3. 3.8.3

        AWS EC2 instances that are running but not actively used represent unnecessary costs. An "idle" EC2 instance typically exhibits very low metrics on parameters such as CPU utilization, network input/output, and disk read/writes. By leveraging AWS CloudWatch, users can monitor these metrics and identify instances that remain underutilized based on low CPU usage over extended periods. Once identified, these instances can either be stopped or terminated, leading to more efficient resource use and cost savings. It's important to analyze and verify the activity of these instances before taking action to ensure no critical processes are inadvertently affected.

        import boto3 from datetime import datetime, timedelta from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError, EndpointConnectionError, DataNotFoundError # Constants for CPU threshold and lookback period # CPU_THRESHOLD = 5.0 # LOOKBACK_PERIOD_HOURS = 1 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('instances_list') is None: instances_list = [] def get_idle_instances(instances_list): idle_instances = [] end_time = datetime.utcnow() start_time = end_time - timedelta(hours=LOOKBACK_PERIOD_HOURS) for instance in instances_list: if instance['State'] != 'running': continue instance_id = instance['InstanceId'] region = instance['Region'] try: cloudwatch = boto3.client('cloudwatch',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) cpu_stats = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtil', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}] }, 'Period': 3600, 'Stat': 'Average' }, 'ReturnData': True } ], StartTime=start_time, EndTime=end_time ) avg_cpu_utilization = sum(cpu_stats['MetricDataResults'][0]['Values']) / len(cpu_stats['MetricDataResults'][0]['Values']) if cpu_stats['MetricDataResults'][0]['Values'] else 0.0 if avg_cpu_utilization < CPU_THRESHOLD: idle_instances.append(instance) except Exception as e: print(f"Error processing instance {instance_id} in region {region}: {e}") return idle_instances # Main execution # Ensure to include your list_all_ec2_instances function or import it if it's in another module # instances_list = list_all_ec2_instances() Already taken from parent task idle_instances_list = get_idle_instances(instances_list) # Printing the details of idle instances if idle_instances_list: print("\nIdle EC2 Instances:") for instance in idle_instances_list: print("-" * 60) # Separator line for key, value in instance.items(): print(f"{key}: {value}") else: print("No idle instances found.") # Create a new list with only 'InstanceId' and 'Region' for each instance filtered_instances = [{'InstanceId': instance['InstanceId'], 'Region': instance['Region']} for instance in idle_instances_list] context.skip_sub_tasks=True ''' # Print the new list print("Printing instance_id and region wise instance list to check values for passing down to downstream task") for instance in filtered_instances: print(instance) '''
        copied
        3.8.3
        1. 3.8.3.1

          In AWS, an EC2 instance can be in various states, including running, stopped, or terminated. Stopping an EC2 instance essentially means shutting it down, similar to turning off a computer. When an instance is stopped, it is not running, and therefore, you are not billed for instance usage. However, you are still billed for any EBS storage associated with the instance. The advantage of stopping, instead of terminating, is that you can start the instance again at any time. This capability is useful for scenarios where you want to temporarily halt operations without losing the instance configuration or data. It's essential to understand that stopping an instance will lead to the loss of the ephemeral storage content (Instance Store), but data on EBS volumes will remain intact.

          import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('filtered_instances') is None: filtered_instances = [] def stop_ec2_instances(instances_to_stop): # To keep track of instances successfully stopped stopped_instances = [] # To keep track of instances that failed to stop failed_instances = [] # To keep track of instances that were already stopped or in the process of stopping already_stopped_instances = [] # Iterate over each instance in the list for instance_info in instances_to_stop: instance_id = instance_info['InstanceId'] region = instance_info['Region'] # Initialize the EC2 client for the specific region ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) # Fetch the current state of the instance try: response = ec2_client.describe_instances(InstanceIds=[instance_id]) instance_state = response['Reservations'][0]['Instances'][0]['State']['Name'] if instance_state in ["stopped", "stopping"]: already_stopped_instances.append(instance_id) print(f"Instance {instance_id} in region {region} is already in '{instance_state}' state.") continue # If the instance is not already stopped or stopping, then attempt to stop it ec2_client.stop_instances(InstanceIds=[instance_id]) stopped_instances.append(instance_id) print(f"Instance {instance_id} in region {region} has been stopped.") except ClientError as e: failed_instances.append(instance_id) print(f"Error with instance {instance_id} in region {region}: {e}") # Print a summary of the actions print("\nSummary:\n") if stopped_instances: print(f"Successfully stopped {len(stopped_instances)} instances: {', '.join(stopped_instances)}") if already_stopped_instances: print(f"{len(already_stopped_instances)} instances were already stopped or stopping: {', '.join(already_stopped_instances)}") if failed_instances: print(f"Failed to stop {len(failed_instances)} instances: {', '.join(failed_instances)}") ''' # Sample list of instances to stop taken from previous task or provide these instances to use the task in a standalone manner instances_to_stop = [ {'InstanceId': 'i-01615251421b8b5da', 'Region': 'us-east-1'}, {'InstanceId': 'i-057155192c87ea310', 'Region': 'us-east-1'} # ... (other instances) ] ''' stop_ec2_instances(filtered_instances) # passed down from previous task otherwise pass instances_to_stop to function to use the task in a standalone manner.
          copied
          3.8.3.1
    9. 3.9

      This runbook is designed to assist in the removal of unattached Amazon EBS Volumes within an AWS region. Once the EBS volume is deleted, the volume's data is permanently removed, and the volume cannot be attached to any instance. To preserve important data before deletion, you have the option to create a snapshot of the volume, allowing for potential volume recreation in the future.

      3.9
      1. 3.9.1

        This task involves identifying and filtering out Amazon Elastic Block Store (EBS) volumes that are not currently attached to any Amazon EC2 instances within a specific AWS region.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def filter_unattached_ebs_volumes(ec2_client): """ Filters unattached EBS volumes within a specific region. Args: ec2_client (boto3.client): An EC2 client instance. Returns: list: List of unattached EBS volume IDs. """ try: response = ec2_client.describe_volumes( Filters=[{"Name": "status", "Values": ["available"]}] ) unattached_volumes = [] for volume in response["Volumes"]: unattached_volumes.append(volume["VolumeId"]) return unattached_volumes except Exception as e: print(f"Error in filtering unattached volumes: {e}") return [] #regions = ["us-east-2"] # Add your desired regions here for region in regions: # Create an EC2 client instance for the region ec2_client = boto3.client("ec2", aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Get the list of unattached EBS volumes in the region unattached_volumes = filter_unattached_ebs_volumes(ec2_client) if not unattached_volumes: print(f"No unattached EBS volumes found in region {region}") else: print(f"Unattached EBS volumes in region {region}: {unattached_volumes}") context.skip_sub_tasks=True
        copied
        3.9.1
        1. 3.9.1.1

          This task streamlines the process of capturing point-in-time backups of EBS volumes that are not currently attached to instances. By creating snapshots, you can ensure data durability and recovery options, enabling you to safeguard valuable data and simplify data restoration if needed.

          import boto3 import datetime creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_ebs_snapshots(ec2_client, volume_id): """ Creates a snapshot of an EBS volume. Args: ec2_client (boto3.client): An EC2 client instance. volume_id (str): The ID of the EBS volume to create a snapshot of. """ try: response = ec2_client.create_snapshot(VolumeId=volume_id) snapshot_id = response["SnapshotId"] print(f"Snapshot {snapshot_id} created for volume {volume_id} at {datetime.datetime.now()}") except Exception as e: print(f"Error in creating snapshot for volume {volume_id}: {e}") #regions = ["us-east-2"] # Add your desired regions here for region in regions: # Create an EC2 client instance for the region ec2_client = boto3.client("ec2", aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) if not unattached_volumes: print(f"No unattached EBS volumes found in region {region}") else: print(f"Unattached EBS volumes in region {region}: {unattached_volumes}") for volume_id in unattached_volumes: create_ebs_snapshots(ec2_client, volume_id) context.proceed = False
          copied
          3.9.1.1
        2. 3.9.1.2

          Efficiently manage your AWS Elastic Block Store (EBS) volumes by automating the deletion of unattached volumes. This task identifies EBS volumes that are not currently attached to any instances and removes them, helping you optimize storage resources and reduce unnecessary costs while maintaining your cloud infrastructure's cleanliness.

          import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_ebs_volume(ec2_client, volume_id): """ Deletes an EBS volume. Args: ec2_client (boto3.client): An EC2 client instance. volume_id (str): The ID of the EBS volume to delete. """ try: ec2_client.delete_volume(VolumeId=volume_id) print(f"Volume {volume_id} deleted.") except Exception as e: print(f"Error in deleting volume {volume_id}: {e}") #regions = ["us-east-1"] # Add your desired regions here for region in regions: # Create an EC2 client instance for the region ec2_client = boto3.client("ec2", aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) if not unattached_volumes: print(f"No unattached EBS volumes found in region {region}") else: print(f"Unattached EBS volumes in region {region}: {unattached_volumes}") for volume_id in unattached_volumes: delete_ebs_volume(ec2_client, volume_id)
          copied
          3.9.1.2
    10. 3.10

      This runbook identifies Amazon Elastic Block Storage (EBS) volumes that exhibit low usage, and subsequently removes them. The process involves searching for EBS volumes that have been minimally utilized over a specified threshold period, and then performing the deletion for those volumes. This automation helps optimize storage resources by removing underutilized volumes, freeing up space and potentially reducing costs.

      low_usage_threshold = 90 # Harcoded for one time result region_name=None
      copied
      3.10
      1. 3.10.1

        This task aims to identify Amazon Elastic Block Storage (EBS) volumes with minimal usage. It involves scanning through AWS resources to pinpoint EBS volumes that have been scarcely utilized over a predefined threshold period. This process can be crucial for optimizing storage resources and identifying opportunities to reduce costs, as it helps identify volumes that may no longer be necessary due to low activity levels.

        import boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Set the threshold for low usage in days #low_usage_threshold = 30 def get_all_regions(): """Retrieve all AWS regions.""" ec2 = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def find_low_usage_volumes(ec2_client, region): """ Find EBS volumes with low usage in a specific AWS region. Args: ec2_client (boto3.client): An EC2 client instance for a specific region. region (str): The AWS region of the volumes. Returns: list: List of dictionaries containing volume IDs and their region with low usage. """ low_usage_volumes = [] try: response = ec2_client.describe_volumes() for volume in response['Volumes']: volume_id = volume['VolumeId'] create_time = volume['CreateTime'] days_since_creation = (datetime.now() - create_time.replace(tzinfo=None)).days if days_since_creation >= low_usage_threshold: low_usage_volumes.append({'VolumeId': volume_id, 'Region': region}) return low_usage_volumes except Exception as e: print(f"Error in finding low usage volumes in region {region}: {e}") return [] # region_name to be provided; if None, script runs for all regions #region_name = None # Set to a specific region, e.g., 'us-east-2', or None for all regions regions_to_process = [region_name] if region_name else get_all_regions() for region in regions_to_process: ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) low_usage_volumes = find_low_usage_volumes(ec2_client, region) if not low_usage_volumes: print(f"No low usage EBS volumes found in region {region}") else: print(f"Low usage EBS volumes in region {region}: {low_usage_volumes}") context.skip_sub_tasks=True
        copied
        3.10.1
        1. 3.10.1.1

          This task involves scanning through a list of EBS volumes and deleting them, provided they are not associated with any ec2 instances. Deleting these detached volumes that are no longer in use can help optimize storage resources and reduce unnecessary costs.

          import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_detached_low_usage_volumes(volume_info): """ Delete detached low usage EBS volumes. Args: volume_info (dict): Dictionary containing the volume ID and its region. Returns: tuple: A tuple containing the count of deleted and skipped volumes. """ deleted_count, skipped_count = 0, 0 volume_id = volume_info['VolumeId'] region = volume_info['Region'] try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) volume = ec2_client.describe_volumes(VolumeIds=[volume_id])['Volumes'][0] if not volume['Attachments']: ec2_client.delete_volume(VolumeId=volume_id) deleted_count += 1 print(f"Deleted detached low usage EBS volume {volume_id} in region {region}") else: skipped_count += 1 print(f"Volume {volume_id} is attached to an EC2 instance. Skipping deletion in region {region}.") except Exception as e: print(f"Error in deleting volume {volume_id} in region {region}: {e}") return deleted_count, skipped_count # low_usage_volumes is a list of dictionaries received from the upstream task total_deleted, total_skipped = 0, 0 for volume_info in low_usage_volumes: deleted, skipped = delete_detached_low_usage_volumes(volume_info) total_deleted += deleted total_skipped += skipped print(f"Summary: {total_deleted} detached low usage EBS volumes were deleted.") print(f"{total_skipped} volumes were skipped (still attached).") if total_deleted == 0: print("No detached low usage EBS volumes were deleted.")
          copied
          3.10.1.1
    11. 3.11

      This runbook streamlines resource management by identifying and removing Elastic Block Store (EBS) volumes linked to stopped Amazon EC2 instances. Through a sequential process, this automation retrieves EBS volumes associated with stopped instances and subsequently detaches and deletes them. This procedure aids in resource optimization and cost efficiency within the AWS environment.

      3.11
      1. 3.11.1

        This is a task focused on identifying Elastic Block Store (EBS) volumes that are connected to Amazon EC2 instances currently in a stopped state by examining the instance-state and corresponding volume mappings.

        import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_stopped_instance_volumes(ec2_client): """ Gets EBS volumes attached to stopped instances. Args: ec2_client (boto3.client): An EC2 client instance. Returns: dict: Dictionary with instance IDs as keys and associated volume IDs as values. """ instance_volume_map = {} try: instances = ec2_client.describe_instances(Filters=[{"Name": "instance-state-name", "Values": ["stopped"]}]) for reservation in instances["Reservations"]: for instance in reservation["Instances"]: instance_id = instance["InstanceId"] volumes = instance.get("BlockDeviceMappings", []) volume_ids = [volume["Ebs"]["VolumeId"] for volume in volumes] instance_volume_map[instance_id] = volume_ids return instance_volume_map except Exception as e: print(f"Error in getting instance volumes: {e}") return {} #regions = ["us-east-1"] # Add your desired regions here for region in regions: try: # Create an EC2 client for the specified region ec2_client = boto3.client("ec2", aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Get the dictionary of stopped instance volumes instance_volume_map = get_stopped_instance_volumes(ec2_client) if not instance_volume_map: print(f"No stopped instances with attached volumes found in region {region}") else: print(f"Stopped instance volumes in region {region}:\n{instance_volume_map}") except Exception as e: print(f"Error in region {region}: {e}") context.skip_sub_tasks=True
        copied
        3.11.1
        1. 3.11.1.1

          This task involves the removal of Elastic Block Store (EBS) volumes from their associated instances followed by the deletion of these volumes. In this particular task detachment and deletion of EBS volumes is related to stopped EC2 instances. It is executed to free up storage resources, enhance resource allocation efficiency, and optimize costs within the AWS infrastructure.

          import boto3 import time creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def detach_and_delete_ebs_volumes(ec2_client, instance_volume_map): """ Detaches and deletes EBS volumes attached to instances. Args: ec2_client (boto3.client): An EC2 client instance. instance_volume_map (dict): Dictionary with instance IDs as keys and associated volume IDs as values. """ try: for instance_id, volume_ids in instance_volume_map.items(): for volume_id in volume_ids: ec2_client.detach_volume(InstanceId=instance_id, VolumeId=volume_id, Force=True) print(f"Detached EBS volume {volume_id} from instance {instance_id}") time.sleep(5) # Wait for a few seconds to ensure detachment is complete otherwise there is a VolumeInUse error try: ec2_client.delete_volume(VolumeId=volume_id) print(f"Deleted EBS volume {volume_id}") except Exception as e: print(f"Error in deleting EBS volume {volume_id}: {e}") except Exception as e: print(f"Error in detaching and deleting EBS volumes: {e}") #regions = ["us-east-1"] # Add your desired regions here for region in regions: # Create an EC2 client instance for the region ec2_client = boto3.client("ec2", aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) if not instance_volume_map: print(f"No volumes attached to stopped instances found in region {region}") else: # Detach and delete the identified EBS volumes detach_and_delete_ebs_volumes(ec2_client, instance_volume_map)
          copied
          3.11.1.1
    12. 3.12

      This runbook identifies and removes old Amazon Elastic Block Store (EBS) snapshots. By setting a specific age threshold it scans through designated AWS regions, pinpoints snapshots that surpass the age limit, and subsequently deletes them. This operation not only ensures efficient resource utilization but also aids in minimizing storage costs, promoting a cleaner and more cost-effective cloud environment.

      days_old=60 #Hardcoded for one time result
      copied
      3.12
      1. 3.12.1

        This task identifies old Amazon Elastic Block Store (EBS) snapshots. By setting an age threshold, it scans across specified AWS regions, highlighting snapshots that exceed the set duration. This facilitates better management, paving the way for timely deletions and efficient storage utilization.

        import boto3 from botocore.exceptions import ClientError from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def find_old_snapshots(ec2_client, days_old, region): """ Find EBS snapshots in a specified AWS region that are older than a given number of days. Args: ec2_client (boto3.client): Boto3 EC2 client object. days_old (int): The age in days to consider an EBS snapshot as old. region (str): The AWS region to search for old snapshots. Returns: list[str]: List of old snapshot IDs. Returns None if there's an error. """ old_snapshots = [] # Initialize an empty list to store the IDs of old snapshots try: # Fetch all snapshots owned by the current AWS account snapshots = ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots'] # Calculate the cutoff date for old snapshots, removing timezone information to make it "naive" cutoff_date = datetime.now().replace(tzinfo=None) - timedelta(days=days_old) # Loop through each snapshot to check its age for snapshot in snapshots: # Remove timezone information from the snapshot's start time to make it "naive" snapshot_time_naive = snapshot['StartTime'].replace(tzinfo=None) # Compare snapshot's start time with the cutoff date if snapshot_time_naive < cutoff_date: old_snapshots.append(snapshot['SnapshotId']) # Append old snapshot IDs to the list return old_snapshots # Return the list of old snapshot IDs except ClientError as e: print(f"A ClientError occurred in region {region}: {e}") # Handle any ClientErrors return None except Exception as e: print(f"An unknown error occurred in region {region}: {e}") # Handle any general exceptions return None # List of AWS regions to check for old snapshots #regions_to_check = ['us-east-1', 'us-east-2'] #, 'us-west-2'] # Age in days to consider an EBS snapshot as old #days_old = 5 # Initialize an empty dictionary to store the snapshot IDs by region snapshots_by_region = {} # Initialize a list to store regions where no old snapshots were found regions_without_snapshots = [] # Loop through each AWS region to find old snapshots for region in regions: print(f"Checking region {region}...") ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Initialize EC2 client for the region old_snapshots = find_old_snapshots(ec2_client, int(days_old), region) # Find old snapshots in the region # If old snapshots are found, add them to the dictionary if old_snapshots: snapshots_by_region[region] = old_snapshots else: regions_without_snapshots.append(region) # Print the resulting dictionary print("\nSummary of old snapshots by region:") for region, snapshot_ids in snapshots_by_region.items(): print(f"{region}: {snapshot_ids}") # Print regions without old snapshots if regions_without_snapshots: print(f"\nNo old snapshots found in the following regions: {', '.join(regions_without_snapshots)}") context.skip_sub_tasks=True
        copied
        3.12.1
        1. 3.12.1.1

          This task removes specified Amazon Elastic Block Store (EBS) snapshots. Designed to streamline storage management, this procedure efficiently purges selected snapshots across designated AWS regions, ensuring optimal resource utilization and reducing unnecessary storage costs.

          import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Function to delete a list of EBS snapshots given their IDs def delete_snapshots(ec2_client, snapshot_ids, region): """ Delete a list of specified EBS snapshots in a given AWS region. Args: ec2_client (boto3.client): Boto3 EC2 client object. snapshot_ids (list[str]): List of EBS snapshot IDs to be deleted. region (str): The AWS region where the snapshots are located. Returns: None: This function does not return any value. """ for snapshot_id in snapshot_ids: try: # Delete the snapshot ec2_client.delete_snapshot(SnapshotId=snapshot_id) print(f"Deleted snapshot {snapshot_id} in region {region}") # Confirm deletion except ClientError as e: print(f"Could not delete snapshot {snapshot_id} in region {region}: {e}") # Handle any ClientErrors ''' #Example structure of snapshots_by_region # Dictionary mapping AWS regions to their respective old snapshots snapshots_by_region = { 'us-east-1': ['snap-04cbc2182c8f5e1ed', 'snap-0004bbdd1e7b0d35c'], 'us-west-2': [] # Just as an example, no snapshots listed for us-west-2 } ''' # Loop through each AWS region in the dictionary to delete old snapshots for region, old_snapshots in snapshots_by_region.items(): print(f"Checking region {region}...") ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) # Initialize EC2 client for the region # Delete old snapshots if any are found for the current region if old_snapshots: print(f"Found {len(old_snapshots)} old snapshots in {region}. Deleting them...") delete_snapshots(ec2_client, old_snapshots, region) else: print(f"No old snapshots found in {region}.") # Confirm if no old snapshots are found in the current region
          copied
          3.12.1.1
    13. 3.13

      This runbook retrieves the list of resources, such as instances or targets, that are marked as 'unhealthy' or 'OutOfService', and are associated with AWS Elastic Load Balancers (ELB). This helps in identifying potential issues and ensuring the smooth operation and high availability of applications.

      3.13
      1. 3.13.1

        This task checks for instances which are OutOfService and are associated with a Classic Load Balancer.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_unhealthy_instances(regions,elb_name=None): """ Fetch instances that are in "OutOfService" state for AWS Elastic Load Balancers (ELBs). Parameters: - elb_name (str, optional): Specific name of the Elastic Load Balancer to check. Default is None, which checks all ELBs. - regions (list): List of AWS regions to check. Returns: - list: A list of dictionaries containing details of unhealthy instances. """ result = [] # Loop through each specified region to check the health of instances under ELBs for reg in regions: try: # Initialize ELB client for the specified region elb_client = boto3.client('elb', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=reg) # Get a list of all load balancers in the current region elbs = elb_client.describe_load_balancers()["LoadBalancerDescriptions"] # Loop through each ELB to check the health of its instances for elb in elbs: # If a specific elb_name is provided, then skip the ELBs that don't match the name if elb_name and elb["LoadBalancerName"] != elb_name: continue # Fetch the health status of instances attached to the current ELB res = elb_client.describe_instance_health(LoadBalancerName=elb["LoadBalancerName"]) # Check each instance's health status for instance in res['InstanceStates']: # If the instance is "OutOfService", add its details to the result list if instance['State'] == "OutOfService": data_dict = { "instance_id": instance["InstanceId"], "region": reg, "load_balancer_name": elb["LoadBalancerName"] } result.append(data_dict) # Handle specific ClientError exceptions (e.g. permission issues, request limits) except ClientError as e: print(f"ClientError in region {reg}: {e}") # Handle general exceptions except Exception as e: print(f"An error occurred in region {reg}: {e}") return result # Specify the AWS regions to check for unhealthy instances #regions_to_check = ['us-east-1', 'us-west-2'] # Fetch the list of unhealthy instances unhealthy_instances = get_unhealthy_instances(regions) # Print the details of unhealthy instances, if any if unhealthy_instances: print("Unhealthy instances detected:") for instance in unhealthy_instances: print(f"Region: {instance['region']}, LoadBalancer: {instance['load_balancer_name']}, InstanceID: {instance['instance_id']}") else: print("No unhealthy instances found.")
        copied
        3.13.1
      2. 3.13.2

        This task retrieves and lists targets that are marked as 'unhealthy' and linked to AWS Application Load Balancers (ALB) or Network Load Balancers (NLB). This process helps in detecting non-performing targets to maintain optimal load distribution and service availability.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_unhealthy_targets(regions, elb_arn=None): """ Fetch targets (instances) that are in "unhealthy" state for AWS Application Load Balancers (ALBs) and Network Load Balancers (NLBs). Parameters: - elb_arn (str, optional): Specific ARN of the Elastic Load Balancer to check. Default is None, which checks all ELBs. - regions (list): List of AWS regions to check. Returns: - list: A list of dictionaries containing details of unhealthy targets. """ # Initialize an empty list to store results result = [] # Loop through each specified region to check for unhealthy targets for reg in regions: try: # Create a new client for the ELBv2 service in the specified region elbv2_client = boto3.client('elbv2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=reg) # Retrieve the list of all ALBs and NLBs in the current region elbs = elbv2_client.describe_load_balancers()["LoadBalancers"] # Loop through each Load Balancer and inspect its targets for elb in elbs: # If a specific ELB ARN is provided, skip all other load balancers if elb_arn and elb["LoadBalancerArn"] != elb_arn: continue # Get all target groups associated with the current load balancer target_groups = elbv2_client.describe_target_groups(LoadBalancerArn=elb["LoadBalancerArn"])["TargetGroups"] # Check the health status of each target within the target group for tg in target_groups: health_descriptions = elbv2_client.describe_target_health(TargetGroupArn=tg["TargetGroupArn"])["TargetHealthDescriptions"] # If a target is found to be "unhealthy", store its details in the result list for desc in health_descriptions: if desc["TargetHealth"]["State"] == "unhealthy": data_dict = { "target_id": desc["Target"]["Id"], "region": reg, "load_balancer_arn": elb["LoadBalancerArn"], "target_group_arn": tg["TargetGroupArn"] } result.append(data_dict) # Catch any AWS-related exceptions and print an error message except ClientError as e: print(f"ClientError in region {reg}: {e}") # Catch any other general exceptions and print an error message except Exception as e: print(f"An error occurred in region {reg}: {e}") return result # Specify the AWS regions to check for unhealthy targets #regions_to_check = ['us-east-1', 'us-west-2'] # Retrieve and print the details of any found unhealthy targets unhealthy_targets = get_unhealthy_targets(regions) if unhealthy_targets: print("Unhealthy targets detected:") for target in unhealthy_targets: print(f"Region: {target['region']}\nLoadBalancer ARN: {target['load_balancer_arn']}\nTargetGroup ARN: {target['target_group_arn']}\nTarget ID: {target['target_id']}\n") else: print("No unhealthy targets found.")
        copied
        3.13.2
    14. 3.14

      This runbook helps in identifying and removing Amazon Elastic Load Balancers (ELBs) that do not have any associated target groups or instances. ELBs play a crucial role in distributing traffic across instances, and if they are no longer serving a purpose due to the absence of targets or instances, it's recommended to remove them to optimize resources and reduce unnecessary costs. This process involves identifying such ELBs across specified AWS regions, displaying their details, and then, if applicable, deleting them to maintain an efficient and streamlined AWS environment.

      3.14
      1. 3.14.1

        This task identifies AWS Elastic Load Balancers (ELBs) that have no associated targets or instances. Such ELBs may indicate unused resources, leading to unnecessary costs. Checking and managing these can optimize AWS expenses.

        import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def aws_find_elbs_with_no_targets_or_instances(regions): """ Returns details of Elastic Load Balancers (ELBs) across specified AWS regions that are not associated with any target groups or instances. Args: regions (list): List of AWS regions to check. Returns: tuple: Tuple of status, and details of ELBs with no targets or instances. """ result = [] # List to store ELBs with no targets or instances all_load_balancers = [] # List to store all fetched ELBs # Iterate over each specified AWS region for reg in regions: try: # Create clients for ELBv2 (Application, Network, Gateway) and Classic ELB elbv2Client = boto3.client('elbv2', aws_access_key_id = access_key,aws_secret_access_key=secret_key,region_name=reg) elbClient = boto3.client('elb', aws_access_key_id = access_key,aws_secret_access_key=secret_key,region_name=reg) # Fetch ELBv2 Load Balancers using pagination elbv2_paginator = elbv2Client.get_paginator('describe_load_balancers') for page in elbv2_paginator.paginate(): for lb in page['LoadBalancers']: elb_dict = { "elb_name": lb['LoadBalancerName'], "elb_arn": lb['LoadBalancerArn'], "type": lb['Type'], "region": reg } all_load_balancers.append(elb_dict) # Fetch Classic Load Balancers elb_response = elbClient.describe_load_balancers() for lb in elb_response['LoadBalancerDescriptions']: elb_dict = { "elb_name": lb['LoadBalancerName'], "type": 'classic', "region": reg } all_load_balancers.append(elb_dict) # Handle potential client errors (e.g., permission issues) except ClientError as ce: print(f"Client error in region {reg}: {ce}") # Handle other exceptions except Exception as e: print(f"Error in region {reg}: {e}") # Identify ELBs with no associated targets or instances for load_balancer in all_load_balancers: if load_balancer['type'] in ['network', 'application']: elbv2Client = boto3.client('elbv2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=load_balancer['region']) target_groups = elbv2Client.describe_target_groups(LoadBalancerArn=load_balancer['elb_arn']) if not target_groups['TargetGroups']: result.append(load_balancer) elif load_balancer['type'] == 'classic': elbClient = boto3.client('elb', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=load_balancer['region']) instance_health = elbClient.describe_instance_health(LoadBalancerName=load_balancer['elb_name']) if not instance_health['InstanceStates']: result.append(load_balancer) elif load_balancer['type'] == 'gateway': elbv2Client = boto3.client('elbv2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=load_balancer['region']) listeners = elbv2Client.describe_listeners(LoadBalancerArn=load_balancer['elb_arn']) if not listeners['Listeners']: result.append(load_balancer) # Return identified ELBs return (False, result) if result else (True, None) # Specify the AWS regions to check #regions_to_check = ['us-west-1', 'us-east-1'] # Modify this list as needed # Find ELBs with no targets or instances output_status, output_data = aws_find_elbs_with_no_targets_or_instances(regions) # Print and Delete the identified ELBs if output_status: print("No load balancers found with no targets or instances.") else: for elb in output_data: print(f"ELB Name: {elb['elb_name']}") if 'elb_arn' in elb: print(f"ELB ARN: {elb['elb_arn']}") print(f"Type: {elb['type']}") print(f"Region: {elb['region']}") print("-" * 40) context.skip_sub_tasks=True
        copied
        3.14.1
        1. 3.14.1.1

          This task deletes Amazon Elastic Load Balancers (ELBs) that are not associated with any targets or instances. These unattached ELBs could be remnants of previously deployed applications or services. By identifying and removing them, organizations can not only free up unused resources but also optimize their AWS infrastructure costs. This task helps maintain a clean and efficient cloud environment while ensuring cost-effectiveness.

          import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_elbs(load_balancers): """ Deletes the specified Elastic Load Balancers. Args: load_balancers (list): List of dictionaries containing ELB details. Returns: None. """ # Iterate over each ELB to delete for elb in load_balancers: region = elb['region'] elb_type = elb['type'] try: # Handle ELBv2 (Application, Network, Gateway) deletion if elb_type in ['application', 'network', 'gateway']: client = boto3.client('elbv2', aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) client.delete_load_balancer(LoadBalancerArn=elb['elb_arn']) # Handle Classic ELB deletion elif elb_type == 'classic': client = boto3.client('elb',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) client.delete_load_balancer(LoadBalancerName=elb['elb_name']) print(f"Deleted {elb_type} load balancer {elb['elb_name']} in region {region}") # Handle potential client errors during deletion except ClientError as ce: print(f"Client error while deleting {elb_type} load balancer {elb['elb_name']} in region {region}: {ce}") # Handle other exceptions during deletion except Exception as e: print(f"Error while deleting {elb_type} load balancer {elb['elb_name']} in region {region}: {e}") # Specify the AWS regions to check #regions_to_check = ['us-west-1', 'us-east-1'] # Modify this list as needed ''' # Find ELBs with no targets or instances output_status, output_data = aws_find_elbs_with_no_targets_or_instances(regions=regions_to_check) ''' # Print and Delete the identified ELBs if output_status: print("No load balancers found with no targets or instances.") else: for elb in output_data: print(f"ELB Name: {elb['elb_name']}") if 'elb_arn' in elb: print(f"ELB ARN: {elb['elb_arn']}") print(f"Type: {elb['type']}") print(f"Region: {elb['region']}") print("-" * 40) delete_elbs(output_data) print("Load balancers deleted successfully.")
          copied
          3.14.1.1
    15. 3.15

      This runbook is designed to efficiently manage AWS Elastic Container Service (ECS) resources. It scans specified AWS regions to identify ECS clusters with low average CPU utilization, based on a user-defined threshold. Once these underutilized clusters are identified, the runbook proceeds to delete them, thereby optimizing resource usage and potentially reducing operational costs.

      cpu_threshold=20 # Hardcoded for one time result
      copied
      3.15
      1. 3.15.1

        This task scans multiple AWS regions to identify ECS clusters that are underutilized in terms of CPU, based on a set threshold. This enables organizations to easily spot clusters that are consuming resources without delivering optimal performance, thereby helping in decision-making processes related to scaling, resource allocation, or decommissioning. This task aims to improve efficiency and reduce costs by flagging these low-activity clusters for further action.

        import boto3 import datetime from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_ecs_clusters_with_low_cpu_utilization(regions, threshold): """ Identifies ECS clusters with low average CPU utilization across multiple AWS regions. Args: regions (List[str]): List of AWS regions to check. threshold (int): CPU utilization percentage below which a cluster is considered underutilized. Returns: List[dict]: List of dictionaries containing cluster and service/task details. """ low_cpu_clusters = [] # List to store details of low CPU utilization clusters # Loop through each region for region in regions: try: # Initialize ECS and CloudWatch clients for the region ecs = boto3.client('ecs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Fetch all cluster ARNs in the region clusters = ecs.list_clusters()['clusterArns'] # Loop through each cluster for cluster in clusters: cluster_name = cluster.split('/')[-1] # Extract the cluster name from the ARN # Define the time range for CloudWatch metrics as the last 60 minutes end_time = datetime.datetime.utcnow() start_time = end_time - datetime.timedelta(hours=1) # Fetch service ARNs in the cluster services = ecs.list_services(cluster=cluster_name)['serviceArns'] # Loop through each service to fetch its average CPU utilization for service in services: service_name = service.split('/')[-1] # Extract the service name from the ARN # Get average CPU utilization from CloudWatch cpu_response = cloudwatch.get_metric_statistics( Namespace='AWS/ECS', MetricName='CPUUtilization', Dimensions=[ {'Name': 'ClusterName', 'Value': cluster_name}, {'Name': 'ServiceName', 'Value': service_name}, ], StartTime=start_time, EndTime=end_time, Period=300, Statistics=['Average'] ) # Calculate average CPU utilization avg_cpu_service = sum(datapoint['Average'] for datapoint in cpu_response['Datapoints']) / len(cpu_response['Datapoints']) if cpu_response['Datapoints'] else 0 # Check if the average CPU utilization is below the threshold if avg_cpu_service < threshold: low_cpu_clusters.append({ 'Region': region, 'ClusterName': cluster_name, 'ServiceName': service_name, 'AverageCPU': avg_cpu_service, 'Type': 'Service' }) # Fetch task ARNs in the cluster tasks = ecs.list_tasks(cluster=cluster_name)['taskArns'] # Loop through each task to fetch its average CPU utilization for task in tasks: task_name = task.split('/')[-1] # Extract the task name from the ARN # Get average CPU utilization from CloudWatch cpu_response = cloudwatch.get_metric_statistics( Namespace='AWS/ECS', MetricName='CPUUtilization', Dimensions=[ {'Name': 'ClusterName', 'Value': cluster_name}, {'Name': 'TaskId', 'Value': task_name}, ], StartTime=start_time, EndTime=end_time, Period=300, Statistics=['Average'] ) # Calculate average CPU utilization avg_cpu_task = sum(datapoint['Average'] for datapoint in cpu_response['Datapoints']) / len(cpu_response['Datapoints']) if cpu_response['Datapoints'] else 0 # Check if the average CPU utilization is below the threshold if avg_cpu_task < threshold: low_cpu_clusters.append({ 'Region': region, 'ClusterName': cluster_name, 'TaskName': task_name, 'AverageCPU': avg_cpu_task, 'Type': 'Task' }) except ClientError as ce: print(f"A botocore exception occurred in region {region}: {ce.response['Error']['Message']}") except Exception as e: print(f"An unknown error occurred in region {region}: {e}") # General exception handling return low_cpu_clusters # Return the list of low CPU utilization clusters/services/tasks # Define the AWS regions and CPU utilization threshold #regions_to_check = ['us-east-1', 'us-west-2'] #cpu_threshold = 20 # In percentage # Execute the function and get low CPU utilization clusters low_cpu_clusters_list = get_ecs_clusters_with_low_cpu_utilization(regions, threshold=int(cpu_threshold)) # Display the result if low_cpu_clusters_list: print(f"Found {len(low_cpu_clusters_list)} ECS clusters with low CPU Utilization") for entry in low_cpu_clusters_list: print(entry) else: print(f"Found {len(low_cpu_clusters_list)} ECS clusters with low CPU Utilization") context.skip_sub_tasks=True
        copied
        3.15.1
        1. 3.15.1.1

          This task removes specified ECS clusters, thereby helping organizations maintain a clean and efficient environment. This task is particularly useful for decommissioning clusters that are no longer needed, or that have been identified as underutilized, thereby contributing to cost savings and resource optimization. It ensures that all associated services and tasks within the clusters are properly terminated before removing the clusters themselves.

          import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_low_cpu_clusters(low_cpu_clusters): """ Deletes ECS clusters, their services, and tasks based on low CPU utilization. Args: low_cpu_clusters (list): List of dictionaries containing cluster and service/task details. Returns: None """ deleted_clusters = False # Flag to track if any clusters get deleted # Loop through each entry in low_cpu_clusters for entry in low_cpu_clusters: try: region = entry['Region'] cluster_name = entry['ClusterName'] service_name = entry.get('ServiceName', None) task_name = entry.get('TaskName', None) type_ = entry['Type'] # Initialize ECS client for the region ecs = boto3.client('ecs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Delete service if it's a low CPU service # A service can't be deleted if it has an active associated task but will be deleted if the cluster is then deleted if type_ == 'Service': ecs.update_service( cluster=cluster_name, service=service_name, desiredCount=0 # Set desired task count to 0 before deleting ) ecs.delete_service( cluster=cluster_name, service=service_name ) print(f"Deleted service {service_name} in cluster {cluster_name}") # Stop task if it's a low CPU task if type_ == 'Task': ecs.stop_task( cluster=cluster_name, task=task_name ) print(f"Stopped task {task_name} in cluster {cluster_name}") # Delete cluster ecs.delete_cluster(cluster=cluster_name) print(f"Deleted cluster {cluster_name}") deleted_clusters = True except ClientError as e: print(f"A botocore exception occurred: {e.response['Error']['Message']}") except Exception as e: print(f"An unknown error occurred: {e}") # General exception handling # If no clusters were deleted, print a message stating the region being checked if not deleted_clusters: print(f"No ECS clusters with low CPU utilization were deleted") # Execute the function to delete low CPU utilization clusters, services, and tasks delete_low_cpu_clusters(low_cpu_clusters=low_cpu_clusters_list)
          copied
          3.15.1.1
    16. 3.16

      This runbook involves listing all IAM users, identifying those who haven't accessed AWS services for a specified period, and then safely deleting these inactive users. This process enhances security by removing potential vulnerabilities and optimizes resource usage in the AWS environment. Always proceed with caution to avoid unintended deletions.

      days_inactive=60 # Harcoded for one time result
      copied
      3.16
      1. 3.16.1

        This lists all IAM users in an AWS account, providing key details like usernames, user IDs, and creation dates. Essential for managing permissions and auditing access, this function supports security and compliance protocols by offering a clear view of user entities and their access levels. It's instrumental in enforcing security policies and the principle of least privilege in AWS resource access management.

        import boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM client iam_client = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: # Create a paginator for the list_users operation paginator = iam_client.get_paginator('list_users') # Use the paginator to paginate through the users table = context.newtable() table.title = "User list" table.num_cols = 3 table.num_rows = 1 table.has_header_row = True rownum = 0 table.setval(rownum, 0, "User name") table.setval(rownum, 1, "User ID") table.setval(rownum, 2, "Created on") for page in paginator.paginate(): users = page['Users'] table.num_rows += len(page['Users']) # Output user details if users: # print("List of IAM Users:") for user in users: rownum += 1 # print(f"Username: {user['UserName']}, User ID: {user['UserId']}, Created On: {user['CreateDate']}") table.setval(rownum, 0, user['UserName']) table.setval(rownum, 1, user['UserId']) table.setval(rownum, 2, user['CreateDate']) else: print("No IAM users found in this page.") # Handle specific exceptions except botocore.exceptions.NoCredentialsError: print("Credentials not available") except botocore.exceptions.PartialCredentialsError: print("Incomplete credentials provided") except botocore.exceptions.SSLError: print("SSL connection could not be established. Ensure your network allows SSL connections to AWS services") except botocore.exceptions.EndpointConnectionError: print("Unable to connect to the endpoint. Check your AWS configuration and network settings") except botocore.exceptions.ClientError as e: print(f"Unexpected error occurred accessing AWS: {e}") # Handle general exceptions except Exception as e: print(f"An unhandled error occurred: {str(e)}")
        copied
        3.16.1
      2. 3.16.2

        This task identifies users who haven't accessed AWS services within a specified timeframe. This process helps to maintain a secure and well-organized IAM environment by focusing on active users and potentially deactivating or removing those who are no longer in use.

        import datetime from dateutil.tz import tzlocal ''' # Example structure users = [ { 'Path': '/', 'UserName': 'test_user', 'UserId': 'AIDASXXCM5TCP7MKLMYZA', 'Arn': 'arn:aws:iam::188379622596:user/test_user', 'CreateDate': datetime.datetime(2023, 8, 23, 18, 3, 46, tzinfo=tzlocal()), 'PasswordLastUsed': datetime.datetime(2023, 10, 28, 11, 46, 16, tzinfo=tzlocal()) } ] ''' # Filter out users who haven't accessed AWS services for a specified number of days current_time = datetime.datetime.now(tzlocal()) # Check if users list is empty or not passed from the upstream task if not users: print("No users provided from the upstream task.") else: #days_inactive = 90 # Adjust as needed inactive_users = [] for user in users: if 'PasswordLastUsed' not in user: continue last_used = user['PasswordLastUsed'] days_since_last_use = (current_time - last_used).days if days_since_last_use > int(days_inactive): inactive_users.append(user) # Check if there are any inactive users if not inactive_users: print("No inactive users found.") else: for user in inactive_users: days_since_last_use = (current_time - user['PasswordLastUsed']).days print(f"Inactive User: {user['UserName']}, Last Used: {user['PasswordLastUsed']}, Inactivity: {days_since_last_use} days") context.skip_sub_tasks=True
        copied
        3.16.2
        1. 3.16.2.1

          This task deletes an IAM user in AWS which is a critical step in managing access to AWS resources. This process ensures that the user no longer has permission to perform actions or access resources. It involves several key steps: detaching all associated policies, removing any login profiles or access keys, and finally, deleting the user itself. This action is irreversible, and once the user is deleted, they cannot access the AWS Management Console, AWS CLI, or API operations unless recreated. Properly removing users helps in maintaining a secure and tidy AWS environment, especially when individuals no longer require access or have changed roles.

          import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the IAM and STS clients iam = boto3.client('iam',aws_access_key_id=access_key,aws_secret_access_key=secret_key) sts = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def delete_iam_user(username=None): """ Delete an IAM user and its associated resources. Parameters: - username (str, optional): The name of the IAM user to delete. """ # Step 0: Preliminary check if a username is provided if not username: print("Error: Username is required to delete an IAM user.") return # Step 1: Check if the user exists try: iam.get_user(UserName=username) except iam.exceptions.NoSuchEntityException: print(f"User {username} does not exist.") return except Exception as e: print(f"Error fetching details for IAM user {username}: {e}") return # Step 2: Delete access keys associated with the user try: # Fetching all the access keys associated with the user access_keys = iam.list_access_keys(UserName=username) # Iterate through each access key and delete them for key_metadata in access_keys['AccessKeyMetadata']: iam.delete_access_key(UserName=username, AccessKeyId=key_metadata['AccessKeyId']) print(f"Deleted access key {key_metadata['AccessKeyId']} for user {username}.") except Exception as e: print(f"Error deleting access keys for user {username}: {e}") # Step 3: Delete login profile for the user try: # Deleting the console access (login profile) of the user iam.delete_login_profile(UserName=username) print(f"Login profile for user {username} deleted successfully.") except iam.exceptions.NoSuchEntityException: print(f"No login profile found for user {username}.") except Exception as e: print(f"Error deleting login profile for user {username}: {e}") # Step 4: Detach all policies associated with the user # Using a paginator to handle users with a large number of attached policies paginator = iam.get_paginator('list_attached_user_policies') for page in paginator.paginate(UserName=username): for policy in page['AttachedPolicies']: try: # Detaching each policy from the user iam.detach_user_policy(UserName=username, PolicyArn=policy['PolicyArn']) print(f"Detached policy {policy['PolicyName']} from user {username}.") except Exception as e: print(f"Error detaching policy {policy['PolicyName']} from user {username}: {e}") # Step 5: Delete the IAM user try: # Deleting the user from AWS IAM iam.delete_user(UserName=username) print(f"IAM user {username} deleted successfully.") except Exception as e: print(f"Error deleting IAM user {username}: {e}") # Step 6: Post-deletion verification try: # Checking if the user still exists response = iam.get_user(UserName=username) print(f"User {username} still exists!") except iam.exceptions.NoSuchEntityException: print(f"Verified that user {username} has been deleted successfully.") # Fetching the identity of the caller for audit/tracking purposes caller_identity = sts.get_caller_identity() print(f"User {username} deleted by: {caller_identity['Arn']}") except Exception as e: print(f"Error verifying the deletion of IAM user {username}: {e}") ''' Specify the username of the IAM user you wish to delete user_to_delete initialized in input parameters ''' user_to_delete = locals().get('user_to_delete', '') or '' if not user_to_delete: print("Please provide a valid user name.") else: delete_iam_user(user_to_delete)
          copied
          3.16.2.1