agent: |
End-to-End AWS Cost Tracking and Management
This runbook focuses on the end-to-end management of AWS costs, starting from the initial setup of CUR reports in S3 buckets to detailed tracking and analysis of these reports for effective cost control and insights.
- 1PdWtOhSZKNoLcrMfaEJdVerify whether a CUR report has been set up with the proper prerequisites
1
Verify whether a CUR report has been set up with the proper prerequisites
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task checks for the existence of a specified S3 bucket and exports the names of CUR reports and base paths. If these do not exist, it automates the configuration of CUR reports, including S3 bucket creation, policy updates for cost data delivery, and CUR setup.
inputsoutputsimport boto3 from botocore.exceptions import ClientError, NoCredentialsError, EndpointConnectionError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize the STS client sts_client = boto3.client('sts', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") create_bucket_action = True report_name = 'My-CUR-report-Athena' BASE_PATH = f"{report_name}/{report_name}/" # BASE_PATH generation in the format AWS saves these reports in the S3 buckets try: # Get the caller identity from STS caller_identity = sts_client.get_caller_identity() account_number = caller_identity.get('Account') # Initialize the S3 client with the account number s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) BUCKET_NAME = f"dagknows-cur-logging-bucket-athena-{account_number}" try: # Attempt to get information about the specified bucket to check if it exists if (s3.head_bucket(Bucket=BUCKET_NAME)): print(f"Bucket {BUCKET_NAME} exists.") context.skip_sub_tasks = True except ClientError as e: error_code = e.response['Error']['Code'] if error_code == '404' and create_bucket_action: # Bucket doesn't exist and create_bucket_action is True print(f"Bucket {BUCKET_NAME} does not exist. Creating and configuring the bucket in the next task...") report_name = 'My-CUR-report-Athena' BASE_PATH = f"{report_name}/{report_name}/" # BASE_PATH generation in the format AWS saves these reports in the S3 buckets elif error_code == '403': print(f"Access to bucket {BUCKET_NAME} is forbidden.") context.skip_sub_tasks = True context.proceed = False else: print(f"Error when checking bucket: {e}") context.skip_sub_tasks = True context.proceed = False except NoCredentialsError: print("No AWS credentials found. Please configure your AWS credentials.") context.skip_sub_tasks = True context.proceed = False except EndpointConnectionError: print("Unable to connect to the S3 endpoint. Check your internet connection or AWS configuration.") context.skip_sub_tasks = True context.proceed = False except Exception as ex: print(f"An unexpected error occurred: {ex}") context.skip_sub_tasks = True context.proceed = False # Note: The bucket name should comply with AWS bucket naming rules and be globally unique.copied1- 1.1D1slhAQVok2mP0ZuqJ04End to End Configuration of an AWS Cost And Usage Report(CUR) to a S3 Bucket
1.1
End to End Configuration of an AWS Cost And Usage Report(CUR) to a S3 Bucket
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook provides a comprehensive guide for setting up and configuring AWS Cost and Usage Reports (CUR) to be delivered to an S3 bucket. It covers the process from creating a new S3 bucket, updating its policy for CUR compatibility, to configuring the CUR settings to target the created bucket.
inputsoutputs1.1- 1.1.1q2k8ukldgYZiKQKHmL64Create a New AWS S3 Bucket
1.1.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves creating a new Amazon S3 bucket in a specified AWS region. It's the initial step in setting up a destination for storing Cost and Usage Reports.
inputsoutputsimport boto3 from botocore.exceptions import ClientError # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_s3_bucket(bucket_name, region): """ Creates an S3 bucket in a specified region. :param bucket_name: Name of the S3 bucket to create. :param region: Region to create the bucket in. """ s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: if region == 'us-east-1': #Your default region should be specified here s3_client.create_bucket(Bucket=bucket_name) else: s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': region}) print(f"S3 bucket '{bucket_name}' created in {region}.") except ClientError as e: print(f"Error creating S3 bucket: {e}") # Example usage #bucket_name = 'test-this-cur-logging-bucket-1234' # Replace with your desired bucket name #region_name = 'us-east-1' # Replace with your desired region, e.g., 'us-east-1' #print(f"bucket received from upstream task {BUCKET_NAME}") #print(f"region name received from upstream task {region_name}") create_s3_bucket(BUCKET_NAME, region_name)copied1.1.1 - 1.1.2beFXCgLXgZ2RhT96pGCRUpdate the AWS S3 Bucket Policy to Allow CUR Logging
1.1.2
Update the AWS S3 Bucket Policy to Allow CUR Logging
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.In this task, the S3 bucket's policy is updated to grant necessary permissions for AWS Cost and Usage Reports to deliver log files to the bucket, ensuring secure and compliant data storage.
inputsoutputsimport boto3 import json from botocore.exceptions import ClientError # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize STS client and get account ID sts_client = boto3.client('sts', aws_access_key_id=access_key, aws_secret_access_key=secret_key) account_id = sts_client.get_caller_identity()["Account"] def update_s3_bucket_policy_for_cur(bucket_name, account_id, region): """ Updates the S3 bucket policy to allow AWS CUR to deliver log files. :param bucket_name: Name of the S3 bucket. :param account_id: AWS account ID. :param region: AWS region. """ policy = { "Version": "2008-10-17", "Id": "Policy1335892530063", "Statement": [ { "Sid": "Stmt1335892150622", "Effect": "Allow", "Principal": { "Service": "billingreports.amazonaws.com" }, "Action": [ "s3:GetBucketAcl", "s3:GetBucketPolicy" ], "Resource": f"arn:aws:s3:::{bucket_name}", "Condition": { "StringEquals": { "aws:SourceAccount": account_id, "aws:SourceArn": f"arn:aws:cur:us-east-1:{account_id}:definition/*" # These endpoints here only work on us-east-1 even if the region_name is different } } }, { "Sid": "Stmt1335892526596", "Effect": "Allow", "Principal": { "Service": "billingreports.amazonaws.com" }, "Action": "s3:PutObject", "Resource": f"arn:aws:s3:::{bucket_name}/*", "Condition": { "StringEquals": { "aws:SourceAccount": account_id, "aws:SourceArn": f"arn:aws:cur:us-east-1:{account_id}:definition/*" # These endpoints here only work on us-east-1 even if the region_name is different } } } ] } s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) try: s3_client.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(policy)) print(f"Bucket policy updated to allow CUR deliveries for '{bucket_name}'.") except ClientError as e: print(f"Error updating bucket policy: {e}") # Example usage # bucket_name = 'test-this-cur-logging-bucket-1234' # Replace with the name of your existing bucket # region_name = 'us-east-1' # Replace with your region, e.g., 'us-east-1' update_s3_bucket_policy_for_cur(BUCKET_NAME, account_id, region_name)copied1.1.2 - 1.1.3QihhZKCozaisYMq4aa9FConfigure AWS Cost And Usage Report to a S3 Bucket
1.1.3
Configure AWS Cost And Usage Report to a S3 Bucket
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves configuring AWS Cost and Usage Reports (CUR) to direct the reports to the newly created and configured S3 bucket, finalizing the setup for report generation and storage.
inputsoutputsimport boto3 from botocore.exceptions import ClientError # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def configure_cur_report(bucket_name, report_name, region_name): """ Configures AWS Cost and Usage Report to be delivered to an S3 bucket with Parquet format for Athena. :param bucket_name: Name of the S3 bucket for report delivery. :param report_name: Name of the report. :param region_name: AWS region where the S3 bucket is located. """ cur_client = boto3.client('cur', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') report_definition = { 'ReportName': report_name, 'TimeUnit': 'HOURLY', 'Format': 'Parquet', 'Compression': 'Parquet', 'S3Bucket': bucket_name, 'S3Prefix': f"{report_name}/{report_name}/date-range/", 'S3Region': region_name, 'AdditionalSchemaElements': ['RESOURCES'], 'ReportVersioning': 'OVERWRITE_REPORT', # Updated to OVERWRITE_REPORT 'AdditionalArtifacts': ['ATHENA'], # Enable integration for Athena 'RefreshClosedReports': True } try: response = cur_client.put_report_definition(ReportDefinition=report_definition) print(f"CUR report '{report_name}' configured for delivery to '{bucket_name}'.") except ClientError as e: print(f"Error configuring CUR report: {e}") # Example usage #bucket_name = 'dagknows-cur-logging-bucket-athena-test-188379622596' #report_name = 'My-CUR-report-Athena-Test-1234' #region_name = 'us-east-1' # Replace with your region, e.g., 'us-east-1' configure_cur_report(BUCKET_NAME, report_name, region_name)copied1.1.3
- 1.2D3Ak6K6gZMMWVI1v4OgNCUR Integration using AWS Athena, S3, Glue
1.2
CUR Integration using AWS Athena, S3, Glue
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves setting up an integration to analyze AWS Cost and Usage Report (CUR) data using AWS Athena, storing the data in S3, and organizing it with AWS Glue Crawler.
inputsoutputs1.2- 1.2.1HzshumZwX8irum3pnXEeCreate CloudFormation Stack for Athena CUR Setup
1.2.1
Create CloudFormation Stack for Athena CUR Setup
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Develop a CloudFormation stack to automate the deployment of necessary resources for querying CUR data with Athena, ensuring all components are correctly configured and linked.
inputsoutputsimport boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] bucket_name = BUCKET_NAME # Parameters #bucket_name = 'dagknows-cur-logging-bucket-athena-test-188379622596' #report_name = 'My-CUR-report-Athena-Test-1234' stack_name = 'MyCURReportAthenaStack' #region_name = 'us-east-1' def create_cloudformation_stack(stack_name, template_body, region='us-east-1'): """ Create a CloudFormation stack in AWS using the provided template. :param stack_name: The name of the CloudFormation stack to be created. :param template_body: String containing the YAML formatted CloudFormation template. :param region: AWS region where the stack will be created. Default is 'us-east-1'. :return: None """ cf_client = boto3.client('cloudformation', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) try: print(f"Initiating stack creation: {stack_name}") response = cf_client.create_stack( StackName=stack_name, TemplateBody=template_body, Capabilities=['CAPABILITY_NAMED_IAM'], OnFailure='DELETE', TimeoutInMinutes=10, EnableTerminationProtection=False ) print(f"Stack creation initiated successfully, stack ID: {response['StackId']}") except cf_client.exceptions.LimitExceededException: print("Error: You have exceeded your AWS CloudFormation limits.") except cf_client.exceptions.AlreadyExistsException: print("Error: A stack with the same name already exists.") except cf_client.exceptions.TokenAlreadyExistsException: print("Error: A client request token already exists.") except cf_client.exceptions.InsufficientCapabilitiesException: print("Error: Insufficient capabilities to execute this operation. Check required permissions.") except ClientError as e: print(f"AWS ClientError: {e.response['Error']['Message']}") except Exception as e: print(f"An unexpected error occurred: {e}") def load_template_from_s3(bucket_name, report_name): """ Load a CloudFormation template from an S3 bucket. :param bucket_name: The name of the S3 bucket. :param report_name: The base directory name of the CUR report. :return: String of the file's contents or None if there's an error. """ s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) key = f"{report_name}/{report_name}/date-range/{report_name}/crawler-cfn.yml" # If there is a s3 key error then check this step for crawler.yml S3 URI try: response = s3_client.get_object(Bucket=bucket_name, Key=key) print("Template fetched successfully from S3.") return response['Body'].read().decode('utf-8') except NoCredentialsError: print("Error: No AWS credentials were provided.") except PartialCredentialsError: print("Error: Incomplete AWS credentials were provided.") except ClientError as e: print(f"Error accessing S3: {e.response['Error']['Message']}") except Exception as e: print(f"An unexpected error occurred while loading the template from S3: {e}") return None def stack_exists(stack_name, region): """ Check if a CloudFormation stack exists. :param stack_name: The name of the CloudFormation stack. :param region: AWS region where the stack is located. :return: Boolean indicating if the stack exists. """ cf_client = boto3.client('cloudformation', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) try: response = cf_client.describe_stacks(StackName=stack_name) return len(response['Stacks']) > 0 except cf_client.exceptions.ClientError as e: if 'does not exist' in str(e): return False else: print(f"Error checking if stack exists: {e}") return True def main(): print("Loading the CloudFormation template from S3...") template_body = load_template_from_s3(bucket_name, report_name) if template_body: print("Template loaded successfully.") if stack_exists(stack_name, region_name): print(f"CloudFormation stack '{stack_name}' already exists. Skipping creation.") else: create_cloudformation_stack(stack_name, template_body, region_name) else: print("Failed to load template. Exiting.") main()copied1.2.1 - 1.2.2Am2eJdv5VJFGEgA7YLmqVerify whether the CloudFormation stack has been created
1.2.2
Verify whether the CloudFormation stack has been created
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Check if the specified CloudFormation stack is already present to avoid redundancy, ensuring that resources are managed efficiently without duplication.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, EndpointConnectionError, NoCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def check_stack_creation_status(stack_name, region='us-east-1'): """ Polls the CloudFormation stack status until it is in a completed state or failed. :param stack_name: The name of the CloudFormation stack to check. :param region: AWS region where the stack is located. Default is 'us-east-1'. :return: None """ try: cf_client = boto3.client('cloudformation', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except NoCredentialsError: print("Error: No valid AWS credentials found.") return except EndpointConnectionError: print(f"Error: Unable to connect to the CloudFormation service endpoint in {region}.") return while True: try: response = cf_client.describe_stacks(StackName=stack_name) stack_status = response['Stacks'][0]['StackStatus'] print(f"Current status of the stack '{stack_name}': {stack_status}") if stack_status == 'CREATE_COMPLETE': print(f"Stack {stack_name} has been created successfully.") break elif 'FAILED' in stack_status or 'ROLLBACK' in stack_status or stack_status == 'DELETE_COMPLETE': print(f"Stack {stack_name} creation failed with status: {stack_status}") break except ClientError as e: if e.response['Error']['Code'] == 'ValidationError': print("Error: The specified stack name does not exist.") else: print(f"Failed to get stack status: {e.response['Error']['Message']}") break except Exception as e: print(f"An unexpected error occurred: {e}") break time.sleep(10) # Wait for 10 seconds before checking the stack status again #stack_name = 'MyCURReportAthenaStack' # Replace with your stack name #region_name = "us-east-1" # Specify the AWS region check_stack_creation_status(stack_name, region_name)copied1.2.2 - 1.2.3KwsIZ9b1qbdxgzhF5HPCCreate s3 bucket to store athena cur query results and skip if it exists
1.2.3
Create s3 bucket to store athena cur query results and skip if it exists
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Establish an S3 bucket dedicated to storing results from Athena queries on CUR data; this process includes a check to skip creation if the bucket already exists.
inputsoutputsimport boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_account_number(): """ Fetches the AWS account number using boto3 and STS. :return: AWS account number as a string. """ sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name="us-east-1") try: account_id = sts_client.get_caller_identity()["Account"] print(f"AWS account number fetched: {account_id}") return account_id except ClientError as e: print(f"Error fetching AWS account number: {e}") return None def bucket_exists(bucket_name): """ Check if an S3 bucket exists. :param bucket_name: Name of the S3 bucket. :return: True if the bucket exists, False otherwise. """ s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) try: s3_client.head_bucket(Bucket=bucket_name) return True except ClientError: return False def create_s3_bucket(bucket_name, region='us-east-1'): """ Creates an S3 bucket in a specified region, handling the 'us-east-1' special case. :param bucket_name: Name of the S3 bucket to create. :param region: Region to create the bucket in. """ s3_client = boto3.client('s3', aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: create_bucket_params = { 'Bucket': bucket_name } # Add the LocationConstraint for regions other than 'us-east-1'/Account home region if region != 'us-east-1': create_bucket_params['CreateBucketConfiguration'] = { 'LocationConstraint': region } s3_client.create_bucket(**create_bucket_params) print(f"S3 bucket '{bucket_name}' created in {region}.") except ClientError as e: if e.response['Error']['Code'] in ['BucketAlreadyExists', 'BucketAlreadyOwnedByYou']: print(f"Bucket {bucket_name} already exists in {region}.") else: print(f"Error creating S3 bucket: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"Unexpected error occurred while creating bucket: {e}") account_number = get_account_number() if account_number: bucket_name = f'dagknows-cur-logging-bucket-athena-query-results-{account_number}' #region_name = 'us-east-1' # Specify the AWS region # Check if the bucket exists and create if not if bucket_exists(bucket_name): print(f"Bucket {bucket_name} already exists in {region_name}. Skipping creation.") else: create_s3_bucket(bucket_name, region_name) else: print("Failed to fetch account number. Exiting.")copied1.2.3
- 2fzyIaZmrVjDoZFE9upMJCost Tracking with AWS Cost and Usage Report (CUR)
2
Cost Tracking with AWS Cost and Usage Report (CUR)
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.The AWS Cost and Usage Report (CUR) provides a comprehensive view of AWS expenses and resource utilization over a specified period. By analyzing the CUR, organizations can gain deep insights into their cloud spending patterns, identify cost drivers, and optimize their AWS resource utilization. This report allows for granular cost tracking by various dimensions, including services, regions and others enabling effective budgeting and forecasting.
inputsoutputsimport boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_account_number(): """ Fetches the AWS account number using boto3 and STS. :return: AWS account number as a string. """ sts_client = boto3.client('sts', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: account_id = sts_client.get_caller_identity()["Account"] print(f"AWS account number fetched: {account_id}") return account_id except ClientError as e: print(f"Error fetching AWS account number: {e}") return None account_number = get_account_number() if account_number: bucket_name = f'dagknows-cur-logging-bucket-athena-query-results-{account_number}' print(f"This is the athena query results bucket: {bucket_name}") else: print("Failed to fetch account number. Exiting.") # bucket_name is for the athena query results bucket namecopied2- 2.1x8XXcDaX2gmFCwOkZ79BComprehensive AWS Cost Analysis Breakdown
2.1
Comprehensive AWS Cost Analysis Breakdown
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task provides a comprehensive analysis of AWS expenses through five key modules. It categorizes costs by AWS services, identifies primary cost drivers and more. The analysis charts expenses over time and breaks down costs by AWS account IDs to pinpoint major spenders aiding in efficient financial forecasting and planning.
inputsoutputs2.1- 2.1.1fSFO8HDNIY2YN1QGMiFkDaily AWS Costs using Athena
2.1.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Analyze daily AWS spending by querying the CUR data stored in S3 with Athena, providing insights into overall cost trends and spikes.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs, last_n_days): x = dates # Dates are now strings y = costs # Corresponding daily costs print("x values (dates):", x) print("y values (costs):", y) context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily AWS Costs (Last {last_n_days} Days)' context.plot.add_trace(name=f'Daily AWS Costs (Last {last_n_days} Days)', xpts=x, ypts=y, tracetype="lines") # last_n_days = 7 # Passes as an input parameter query = f""" SELECT DATE(line_item_usage_start_date) AS usage_date, SUM(line_item_unblended_cost) AS total_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY DATE(line_item_usage_start_date) ORDER BY usage_date; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [row[0] for row in results] costs = [float(row[1]) for row in results] print("x values (dates):", dates) print("y values (costs):", costs) visualize_data(dates, costs, last_n_days) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.")copied2.1.1 - 2.1.2RG7Fx5TcHNgGVXL1VgsqDaily AWS Costs by service using Athena
2.1.2
Daily AWS Costs by service using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Break down daily AWS expenses by service to pinpoint where spending is occurring, using Athena to query detailed CUR data.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs_by_service, last_n_days): print("x values (dates):", dates) for service, costs in costs_by_service.items(): print(f"y values (costs) for {service}:", costs) context.plot.add_trace(name=f'{service} (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines") context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily AWS Costs by service (Last {last_n_days} Days excluding last 2 days)' def main(last_n_days, top_n_services): query = f""" WITH service_costs AS ( SELECT DATE(line_item_usage_start_date) AS usage_date, line_item_product_code AS service, SUM(line_item_unblended_cost) AS daily_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY DATE(line_item_usage_start_date), line_item_product_code ), top_services AS ( SELECT service FROM service_costs GROUP BY service ORDER BY SUM(daily_cost) DESC LIMIT {top_n_services} ) SELECT sc.usage_date, sc.service, sc.daily_cost FROM service_costs sc JOIN top_services ts ON sc.service = ts.service ORDER BY sc.usage_date, sc.service; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [] costs_by_service = {} for row in results: date, service, cost = row if date not in dates: dates.append(date) if service not in costs_by_service: costs_by_service[service] = [] costs_by_service[service].append(float(cost)) visualize_data(dates, costs_by_service, last_n_days) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.") # last_n_days = 7 # To be passed as an input parameter # top_n_services = 5 # To be passed as an input parameter main(last_n_days, top_n_services)copied2.1.2 - 2.1.3Q6uA91jnIEnSZHF6jzFPEC2 Cost Analysis
2.1.3
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputs2.1.3- 2.1.3.1dBIssYjV0DdxoufdUtePDaily AWS Costs by top_n_instances using athena
2.1.3.1
Daily AWS Costs by top_n_instances using athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Focus on the highest cost-generating instances, querying CUR data with Athena to identify and analyze the top spending instances daily.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs_by_instance, last_n_days, top_n_instances): print("x values (dates):", dates) for instance, costs in costs_by_instance.items(): print(f"y values (costs) for {instance}:", costs) context.plot.add_trace(name=f'{instance} (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines") context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Top {top_n_instances} EC2 Instances Daily Costs (Last {last_n_days} Days excluding last 2 days)' def main(last_n_days, top_n_instances): query = f""" WITH ranked_instances AS ( SELECT DATE(line_item_usage_start_date) AS usage_date, line_item_resource_id AS instance_id, line_item_product_code AS product_code, product_instance_type AS instance_type, SUM(line_item_unblended_cost) AS daily_cost, RANK() OVER (PARTITION BY DATE(line_item_usage_start_date) ORDER BY SUM(line_item_unblended_cost) DESC) AS rank FROM my_cur_report_athena WHERE line_item_product_code = 'AmazonEC2' AND line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) AND product_instance_type IS NOT NULL AND product_instance_type != '' GROUP BY DATE(line_item_usage_start_date), line_item_resource_id, line_item_product_code, product_instance_type ) SELECT usage_date, instance_id, product_code, instance_type, daily_cost FROM ranked_instances WHERE rank <= {top_n_instances} ORDER BY usage_date, daily_cost DESC; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [] costs_by_instance = {} all_dates = set(row[0] for row in results) for row in results: date, instance_id, product_code, instance_type, cost = row if date not in dates: dates.append(date) instance_key = f"{instance_type} ({instance_id})" if instance_key not in costs_by_instance: costs_by_instance[instance_key] = {d: 0 for d in all_dates} costs_by_instance[instance_key][date] = float(cost) # Convert costs_by_instance to lists for plotting final_costs_by_instance = {k: [v[d] for d in dates] for k, v in costs_by_instance.items()} visualize_data(dates, final_costs_by_instance, last_n_days, top_n_instances) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.") # last_n_days = 7 # To be passed as an input parameter # top_n_instances = 3 # To be passed as an input parameter main(last_n_days, top_n_instances)copied2.1.3.1 - 2.1.3.2YH2n1kJrTYWgmtIQyBr8AWS EC2 Usage Analysis
2.1.3.2
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook involves collecting data on EC2 instances, retrieving CPU utilization metrics from Amazon CloudWatch, and visually plotting this data to identify underutilized or overutilized instances. This task helps in recognizing potential cost-saving opportunities by rightsizing instances, either by downsizing underutilized instances to reduce costs or upsizing overutilized instances to improve performance.
inputsoutputsregion_name = None region_name_to_search_recommendations = Nonecopied2.1.3.2- 2.1.3.2.1l2WbsJzf9MvMxNsLlFq3Get all AWS EC2 instances
2.1.3.2.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Amazon Elastic Compute Cloud (EC2) is a service offered by Amazon Web Services (AWS) that provides resizable compute capacity in the cloud. Through Boto3's EC2 client, the describe_instances() method provides detailed information about each instance, including its ID, type, launch time, and current state. This capability assists users in effectively monitoring and managing their cloud resources.
inputsoutputsimport boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_regions(): ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name = 'us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def list_ec2_instances(region=None): # If no region is provided, fetch instances from all regions regions = [region] if region else list_all_regions() # Create an empty list to store instance details instance_details = [] for region in regions: # Try initializing the Boto3 EC2 client for the specific region try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except (NoCredentialsError, PartialCredentialsError): print(f"Failed for {region}: No AWS credentials found or incomplete credentials provided.") continue except BotoCoreError as e: print(f"Failed for {region}: Error initializing the EC2 client due to BotoCore Error: {e}") continue except Exception as e: print(f"Failed for {region}: Unexpected error initializing the EC2 client: {e}") continue #print(f"Fetching EC2 instance details for region: {region}...") # Try to paginate through the EC2 instance responses for the specific region try: paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page['Reservations']: for instance in reservation['Instances']: # Extract the desired attributes instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'] state = instance['State']['Name'] # Append the details to the list instance_details.append({ 'InstanceId': instance_id, 'InstanceType': instance_type, 'LaunchTime': launch_time, 'State': state, 'Region': region }) #print(f"Fetched all instance details for region: {region} successfully!") except ClientError as e: print(f"Failed for {region}: AWS Client Error while fetching EC2 instance details: {e}") except Exception as e: print(f"Failed for {region}: Unexpected error while fetching EC2 instance details: {e}") return instance_details def display_instance_details(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "EC2 Instance Details" table.num_cols = 5 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = ["Instance ID", "Instance Type", "Launch Time", "State", "Region"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the instance data by launch time for better organization data.sort(key=lambda x: x["LaunchTime"], reverse=True) # Populate the table with instance data for row_num, instance in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each instance values = [ instance["InstanceId"], instance["InstanceType"], instance["LaunchTime"].strftime('%Y-%m-%d %H:%M:%S'), # Format the datetime instance["State"], instance["Region"] ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # You can replace None with a specific region string like 'us-east-1' to get instances from a specific region # Hardcoded region_name for One time Execution Result region_name=None instances_list = list_ec2_instances(region_name) if instances_list: ''' print("\nEC2 Instance Details:") for instance in instances_list: print("-" * 50) # Separator line for key, value in instance.items(): print(f"{key}: {value}")''' display_instance_details(instances_list) else: print("No instances found or an error occurred.")copied2.1.3.2.1 - 2.1.3.2.2ez5YMQ9CcFXNzZMMfBE9Aggregate and Visualize Comprehensive EC2 CPU Utilization
2.1.3.2.2
Aggregate and Visualize Comprehensive EC2 CPU Utilization
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task aggregates CPU utilization data for running EC2 instances across an AWS account, computes the average CPU usage over a specified period, and plots the average to help in assessing overall resource efficiency.
inputsoutputsimport boto3 from datetime import datetime, timedelta last_n_days=30 # AWS Credentials creds = _get_creds(cred_label)['creds'] # Placeholder function to get AWS credentials access_key = creds['username'] secret_key = creds['password'] '''# Placeholder for instances_list instances_list = [ {'InstanceId': 'instance1', 'Region': 'us-east-1', 'State': 'running'}, {'InstanceId': 'instance2', 'Region': 'us-east-1', 'State': 'running'}, # Add more instances as needed ] ''' def fetch_cpu_utilization(instance_id, region, start_time, end_time): cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) metrics = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Average'] ) # Calculate average CPU utilization without NumPy data_points = metrics.get('Datapoints', []) if data_points: avg_cpu = sum(dp['Average'] for dp in data_points) / len(data_points) else: avg_cpu = 0 return avg_cpu def plot_cpu_utilization(instances_list, last_n_days=7): start_time = datetime.utcnow() - timedelta(days=last_n_days) end_time = datetime.utcnow() avg_utilizations = [] for instance in instances_list: if instance['State'] == 'running': avg_cpu = fetch_cpu_utilization(instance['InstanceId'], instance['Region'], start_time, end_time) avg_utilizations.append((instance['InstanceId'], avg_cpu)) # Sort instances by average CPU utilization and select top 3 and bottom 3 avg_utilizations.sort(key=lambda x: x[1], reverse=True) top_instances = avg_utilizations[:3] bottom_instances = avg_utilizations[-3:] # Prepare data for plotting instance_ids = [x[0] for x in top_instances + bottom_instances] utilizations = [x[1] for x in top_instances + bottom_instances] # Plotting context.plot.add_trace( name="CPU Utilization", xpts=instance_ids, ypts=utilizations, tracetype='bar' ) context.plot.xlabel = 'Instance ID' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'Top & Bottom 3 EC2 Instances by CPU Utilization (Last {last_n_days} Days)' plot_cpu_utilization(instances_list, last_n_days=30)copied2.1.3.2.2 - 2.1.3.2.3FNMZWFoMC9d9KCxXZKjuPlot Average CPU Utilization for all running AWS EC2 Instances
2.1.3.2.3
Plot Average CPU Utilization for all running AWS EC2 Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task entails collecting CPU usage metrics from Amazon CloudWatch, calculating the average utilization, and visualizing this data. This task aids in identifying underutilized or overutilized instances, facilitating efficient resource management and cost optimization in AWS.
inputsoutputsimport boto3 from datetime import datetime, timedelta from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError, EndpointConnectionError, DataNotFoundError last_n_days=30 # AWS credentials creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('instances_list') is None: instances_list = [] # Function to fetch CPU utilization for a given instance def fetch_cpu_utilization(instance_id, region, start_time, end_time): try: cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) response = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}] }, 'Period': 3600, # one hour 'Stat': 'Average', }, 'ReturnData': True, }, ], StartTime=start_time, EndTime=end_time ) return response['MetricDataResults'][0]['Timestamps'], response['MetricDataResults'][0]['Values'] except Exception as e: print(f"Error getting CPU utilization for instance {instance_id}: {e}") return [], [] # Main plotting logic def plot_cpu_utilization(instances_list, lookback_days=last_n_days): end_time = datetime.utcnow() start_time = end_time - timedelta(days=lookback_days) # Filter running EC2 instances for instance in instances_list: if instance['State'] != 'running': continue timestamps, cpu_values = fetch_cpu_utilization(instance['InstanceId'], instance['Region'], start_time, end_time) # Check if data is available if timestamps: context.plot.add_trace( name=f"Instance {instance['InstanceId']}", xpts=timestamps, # x-axis points ypts=cpu_values, # y-axis points tracetype="line" ) # Set plot properties context.plot.xlabel = 'Date' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'CPU Utilization per EC2 Instance (Last {last_n_days} Days)' # Execute the plotting function plot_cpu_utilization(instances_list)copied2.1.3.2.3
- 2.1.3.3fgHYQK6ky4IDPP33V0itRightsizing Recommendations for AWS EC2 Instances using AWS Compute Optimizer
2.1.3.3
Rightsizing Recommendations for AWS EC2 Instances using AWS Compute Optimizer
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task utilizes AWS Compute Optimizer to fetch rightsizing recommendations for AWS EC2 instances, aiming to optimize instance sizes based on actual usage. It assesses whether instances are under-utilized or over-utilized, suggesting adjustments to enhance performance and reduce costs. By querying across specified or all regions, it allows for a comprehensive optimization strategy, ensuring resources are efficiently allocated and maximizes cost-effectiveness and performance across your AWS environment.
inputsoutputsregion_name_to_search_recommendations = Nonecopied2.1.3.3- 2.1.3.3.1O5FtOq5CNw0pQ0RzZs9iFetch Rightsizing Recommendations for AWS EC2 Instances
2.1.3.3.1
Fetch Rightsizing Recommendations for AWS EC2 Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task retrieves AWS EC2 instance rightsizing recommendations using AWS Compute Optimizer, identifying cost-saving and performance-enhancing opportunities by analyzing usage patterns. It suggests optimal instance types or sizes, ensuring efficient resource utilization.
inputsoutputsimport json import boto3 from botocore.exceptions import BotoCoreError, ClientError from datetime import datetime creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 clients #compute_optimizer_client = boto3.client('compute-optimizer', region_name='us-west-2') pricing_client = boto3.client('pricing',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') def datetime_converter(o): if isinstance(o, datetime): return o.__str__() def get_price_for_instance(instance_type, region): # Mapping AWS region to the Pricing API format region_name_map = { "us-east-1": "US East (N. Virginia)", "us-east-2": "US East (Ohio)", "us-west-1": "US West (N. California)", "us-west-2": "US West (Oregon)", "af-south-1": "Africa (Cape Town)", "ap-east-1": "Asia Pacific (Hong Kong)", "ap-south-1": "Asia Pacific (Mumbai)", "ap-northeast-3": "Asia Pacific (Osaka)", "ap-northeast-2": "Asia Pacific (Seoul)", "ap-southeast-1": "Asia Pacific (Singapore)", "ap-southeast-2": "Asia Pacific (Sydney)", "ap-northeast-1": "Asia Pacific (Tokyo)", "ca-central-1": "Canada (Central)", "eu-central-1": "EU (Frankfurt)", "eu-west-1": "EU (Ireland)", "eu-west-2": "EU (London)", "eu-south-1": "EU (Milan)", "eu-west-3": "EU (Paris)", "eu-north-1": "EU (Stockholm)", "me-south-1": "Middle East (Bahrain)", "sa-east-1": "South America (São Paulo)"} region_name = region_name_map.get(region, region) # Default to using the region code if no mapping found try: response = pricing_client.get_products( ServiceCode='AmazonEC2', Filters=[ {'Type': 'TERM_MATCH', 'Field': 'instanceType', 'Value': instance_type}, {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': region_name}, {'Type': 'TERM_MATCH', 'Field': 'preInstalledSw', 'Value': 'NA'}, {'Type': 'TERM_MATCH', 'Field': 'operatingSystem', 'Value': 'Linux'}, {'Type': 'TERM_MATCH', 'Field': 'tenancy', 'Value': 'shared'}, {'Type': 'TERM_MATCH', 'Field': 'capacitystatus', 'Value': 'Used'}, ], MaxResults=1 ) price_info = json.loads(response['PriceList'][0]) price_dimensions = next(iter(price_info['terms']['OnDemand'].values()))['priceDimensions'] price_per_unit = next(iter(price_dimensions.values()))['pricePerUnit']['USD'] return float(price_per_unit) except Exception as e: print(f"Error fetching price for {instance_type} in {region}: {e}") return None def display_instance_recommendations(recommendations): # Initialize table with the desired structure and headers table = context.newtable() table.title = "EC2 Instance Rightsizing Recommendations" table.num_cols = 10 # Adjust the number of columns to match the number of attributes you want to display table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = [ "Instance ID", "Instance Name", "Current Instance Type", "Region", "Findings", "Recommended Instance Type", "Migration Effort", "Savings Opportunity (%)", "Estimated Monthly Savings ($)", "Price Difference ($/hr)" ] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Populate the table with data from recommendations for row_num, recommendation in enumerate(recommendations, start=1): instance_id = recommendation['instanceArn'].split('/')[-1] instance_name = recommendation.get('instanceName', 'N/A') current_instance_type = recommendation.get('currentInstanceType', 'N/A') region = recommendation['instanceArn'].split(':')[3] findings = recommendation.get('finding', 'N/A') migration_effort = "N/A" savings_opportunity_percentage = "N/A" estimated_monthly_savings_value = "N/A" price_difference = "N/A" if recommendation.get('recommendationOptions'): option = recommendation['recommendationOptions'][0] recommended_instance_type = option.get('instanceType') migration_effort = option.get('migrationEffort', 'N/A') savings_opportunity_percentage = option.get('savingsOpportunity', {}).get('savingsOpportunityPercentage', 'N/A') estimated_monthly_savings_value = option.get('savingsOpportunity', {}).get('estimatedMonthlySavings', {}).get('value', 'N/A') current_price = get_price_for_instance(current_instance_type, region) recommended_price = get_price_for_instance(recommended_instance_type, region) price_difference = "N/A" if current_price is None or recommended_price is None else f"{current_price - recommended_price:.4f}" values = [ instance_id, instance_name, current_instance_type, region, findings, recommended_instance_type, migration_effort, f"{savings_opportunity_percentage}%", f"${estimated_monthly_savings_value}", f"${price_difference}/hr" ] table.num_rows += 1 # Add a row for each recommendation for col_num, value in enumerate(values): table.setval(row_num, col_num, value) def get_ec2_rightsizing_recommendations(region_name_to_search_recommendations=None): regions_to_search = [] if region_name_to_search_recommendations: regions_to_search.append(region_name_to_search_recommendations) else: # Fetch all regions if none specified ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') all_regions_response = ec2_client.describe_regions() regions_to_search = [region['RegionName'] for region in all_regions_response['Regions']] all_recommendations = [] for region in regions_to_search: try: # Initialize compute-optimizer client with the proper region local_compute_optimizer_client = boto3.client('compute-optimizer', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) next_token = None #page_counter = 1 # To count the number of pages fetched while True: if next_token: response = local_compute_optimizer_client.get_ec2_instance_recommendations(NextToken=next_token) else: response = local_compute_optimizer_client.get_ec2_instance_recommendations() recommendations = response.get('instanceRecommendations', []) if recommendations: all_recommendations.extend(recommendations) #print(f"Fetched {len(recommendations)} recommendations for page {page_counter}.") # Pagination - Check if there's a next page of recommendations next_token = response.get('NextToken') if not next_token: break # Exit loop if there's no more data to fetch #page_counter += 1 except ClientError as error: print(f"Client error in region {region}: {error}") except BotoCoreError as error: print(f"BotoCore error in region {region}: {error}") return all_recommendations def process_recommendations(region_name_to_search_recommendations=None): # Fetch recommendations once, using the provided region or searching all regions. recommendations = get_ec2_rightsizing_recommendations(region_name_to_search_recommendations) display_instance_recommendations(recommendations) # table printing line # If no recommendations were found after searching, exit the function. if not recommendations: print("No recommendations found. Please check if the region is correct or if there are any permissions issues.") return data_for_plotting = [] # Iterate through the fetched recommendations for processing. for recommendation in recommendations: # Extract details from each recommendation as before... instance_id = recommendation['instanceArn'].split('/')[-1] instance_name = recommendation.get('instanceName', 'N/A') findings = recommendation.get('finding', 'N/A') finding_reasons = ", ".join(recommendation.get('findingReasonCodes', [])) instance_state = recommendation.get('instanceState', 'N/A') current_instance_type = recommendation.get('currentInstanceType', 'N/A') tags = json.dumps(recommendation.get('tags', []), default=datetime_converter) account_id = recommendation['instanceArn'].split(':')[4] region = recommendation['instanceArn'].split(':')[3] # Print details for each recommendation... print(f"Instance ID: {instance_id}") print(f"Instance Name: {instance_name}") print(f"Findings: {findings}") print(f"Finding Reasons: {finding_reasons}") print(f"Instance State: {instance_state}") print(f"Current Instance Type: {current_instance_type}") print(f"Tags: {tags}") print(f"Account ID: {account_id}") print(f"Region: {region}") print("-" * 50) for option in recommendation['recommendationOptions']: recommended_instance_type = option.get('instanceType') migration_effort = option.get('migrationEffort', 'N/A') savings_opportunity_percentage = option.get('savingsOpportunity', {}).get('savingsOpportunityPercentage', 'N/A') estimated_monthly_savings_value = option.get('savingsOpportunity', {}).get('estimatedMonthlySavings', {}).get('value', 'N/A') current_price = get_price_for_instance(current_instance_type, region) recommended_price = get_price_for_instance(recommended_instance_type, region) price_difference = "N/A" if current_price is None or recommended_price is None else current_price - recommended_price data_for_plotting.append({ "instance_id": instance_id, "instance_name": instance_name, "estimated_monthly_savings_value": estimated_monthly_savings_value }) print(f"\tRecommended Instance Type: {recommended_instance_type}") print(f"\tMigration Effort: {migration_effort}") print(f"\tSavings Opportunity (%): {savings_opportunity_percentage}") print(f"\tEstimated Monthly Savings: USD {estimated_monthly_savings_value}") print(f"\tCurrent Price: {current_price if current_price is not None else 'N/A'} USD per hour") print(f"\tRecommended Price: {recommended_price if recommended_price is not None else 'N/A'} USD per hour") print(f"\tPrice Difference: {price_difference} USD per hour") print("-" * 25) return data_for_plotting #region_name_to_search_recommendations = None data_for_plotting = process_recommendations(region_name_to_search_recommendations)copied2.1.3.3.1 - 2.1.3.3.2yL3S7RVMgqHLjI2r0KezPlot Savings based on AWS EC2 Rightsizing Recommendations
2.1.3.3.2
Plot Savings based on AWS EC2 Rightsizing Recommendations
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task generates a bar chart visualizing AWS EC2 rightsizing savings, with instance names on the X-axis and different recommendations distinguished by instance ID and rank in the legend.
inputsoutputs# print(json.dumps(data_for_plotting,indent=4)) # Aggregate savings values for each instance, keeping track of both instance ID and name savings_by_instance = {} for entry in data_for_plotting: instance_id = entry["instance_id"] instance_name = entry["instance_name"] # Keep instance name for labeling purposes savings_value = entry["estimated_monthly_savings_value"] # Check if the instance ID is already a key in the dictionary if instance_id not in savings_by_instance: savings_by_instance[instance_id] = {'name': instance_name, 'savings': [savings_value]} else: savings_by_instance[instance_id]['savings'].append(savings_value) # Plotting context.plot.xlabel = "Instance Name" context.plot.ylabel = "Estimated Monthly Savings ($)" context.plot.title = "Estimated Monthly Savings by Instance" # Add a trace for each instance's savings values for instance_id, info in savings_by_instance.items(): instance_name = info['name'] # Retrieve instance name for labeling savings_values = info['savings'] for i, savings_value in enumerate(savings_values): trace_name = f"({instance_id})-Rec{i+1}" context.plot.add_trace(name=trace_name, xpts=[instance_name], ypts=[savings_value], tracetype='bar')copied2.1.3.3.2
- 2.1.4gCIOraWGVXTTGxS8xauyEBS Cost Analysis
2.1.4
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputs2.1.4- 2.1.4.1zSjBBLqgDzGa6tgqOTkFRightsizing Recommendations for AWS EBS Volumes using AWS Compute Optimizer
2.1.4.1
Rightsizing Recommendations for AWS EBS Volumes using AWS Compute Optimizer
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook aids in enhancing storage efficiency by providing insights based on usage analysis. This service suggests optimal EBS volume configurations, including type, size, and IOPS, to align with performance needs and cost savings.
inputsoutputsregion_name_to_search_recommendations = Nonecopied2.1.4.1- 2.1.4.1.1p7esvHfNzElz5LFA2wqgFetch Rightsizing Recommendations for AWS EBS Volumes
2.1.4.1.1
Fetch Rightsizing Recommendations for AWS EBS Volumes
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves analyzing EBS volume usage to offer configuration changes for cost efficiency and performance improvement, based on historical data analysis.
inputsoutputsimport boto3 import json from datetime import datetime, timezone creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Define the pricing_client at the beginning of your script to ensure it's available globally pricing_client = boto3.client('pricing', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') def get_ebs_volume_recommendations(region_name=None): """ Fetch EBS volume recommendations from AWS Compute Optimizer for a specific region or all regions. """ if region_name: regions = [region_name] else: # Initialize a client for the EC2 service to fetch all regions ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') regions_response = ec2_client.describe_regions() regions = [region['RegionName'] for region in regions_response['Regions']] recommendations = [] for region in regions: try: # Initialize Compute Optimizer client for each region compute_optimizer_client = boto3.client('compute-optimizer', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) next_token = None while True: params = {} if next_token: params['NextToken'] = next_token response = compute_optimizer_client.get_ebs_volume_recommendations(**params) recommendations.extend(response.get('volumeRecommendations', [])) next_token = response.get('NextToken', None) if not next_token: break except Exception as e: print(f"Error fetching EBS volume recommendations for region {region}: {e}") return recommendations def get_ebs_price(volume_type, size_gb, region): # Mapping AWS region to the Pricing API format region_name_map = { "us-east-1": "US East (N. Virginia)", "us-east-2": "US East (Ohio)", "us-west-1": "US West (N. California)", "us-west-2": "US West (Oregon)", "af-south-1": "Africa (Cape Town)", "ap-east-1": "Asia Pacific (Hong Kong)", "ap-south-1": "Asia Pacific (Mumbai)", "ap-northeast-3": "Asia Pacific (Osaka)", "ap-northeast-2": "Asia Pacific (Seoul)", "ap-southeast-1": "Asia Pacific (Singapore)", "ap-southeast-2": "Asia Pacific (Sydney)", "ap-northeast-1": "Asia Pacific (Tokyo)", "ca-central-1": "Canada (Central)", "eu-central-1": "EU (Frankfurt)", "eu-west-1": "EU (Ireland)", "eu-west-2": "EU (London)", "eu-south-1": "EU (Milan)", "eu-west-3": "EU (Paris)", "eu-north-1": "EU (Stockholm)", "me-south-1": "Middle East (Bahrain)", "sa-east-1": "South America (São Paulo)"} region_name = region_name_map.get(region, region) #print(f"searching for region {region_name}") # for debugging try: price_response = pricing_client.get_products( ServiceCode='AmazonEC2', Filters=[ {'Type': 'TERM_MATCH', 'Field': 'volumeApiName', 'Value': volume_type}, # Adjusted to 'volumeApiName' {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': region_name}, {'Type': 'TERM_MATCH', 'Field': 'productFamily', 'Value': 'Storage'} ], MaxResults=1 # Increased MaxResults to ensure broader search results ) #print(price_response) # for debugging # Ensure there's at least one price listed if price_response['PriceList']: # Assuming the first price item's details are representative price_data = json.loads(price_response['PriceList'][0]) terms = price_data.get('terms', {}).get('OnDemand', {}) if terms: price_dimensions = next(iter(terms.values()))['priceDimensions'] price_per_gb = next(iter(price_dimensions.values()))['pricePerUnit']['USD'] # Calculate total price based on the volume size total_price = float(price_per_gb) * size_gb return total_price else: print("No pricing terms found.") return None except Exception as e: print(f"Error fetching price for EBS volume: {e}") return None def process_recommendations(recommendations): for recommendation in recommendations: volume_arn = recommendation['volumeArn'] region = volume_arn.split(':')[3] # for pricing api query current_configuration = recommendation['currentConfiguration'] finding = recommendation['finding'] finding_reasons_codes = recommendation.get('findingReasonCodes', []) print(f"Volume ARN: {volume_arn}") print(f"Region: {region}") print(f"Current Configuration: {json.dumps(current_configuration, indent=2)}") print(f"Finding: {finding} {' | '.join(finding_reasons_codes) if finding_reasons_codes else ''}") if 'volumeRecommendationOptions' in recommendation: for option in recommendation['volumeRecommendationOptions']: configuration = option['configuration'] performance_risk = option.get('performanceRisk', 'N/A') rank = option['rank'] volume_type = configuration['volumeType'] size_gb = configuration['volumeSize'] current_price = get_ebs_price(current_configuration['volumeType'], current_configuration['volumeSize'], region) recommended_price = get_ebs_price(volume_type, size_gb, region) print(f"\tRecommended Configuration: {json.dumps(configuration, indent=4)}") print(f"\tPerformance Risk: {performance_risk}") print(f"\tRank: {rank}") print(f"\tCurrent Price: ${current_price} per month") print(f"\tRecommended Price: ${recommended_price} per month") # Calculate and print savings if current_price and recommended_price: savings = current_price - recommended_price print(f"\tEstimated Monthly Savings: ${savings:.2f}") print("-" * 60) else: print("\tNo recommendation options provided.") print("-" * 60) # Example usage #region_name_to_search_recommendations = 'us-east-1' # Set to None for all regions recommendations = get_ebs_volume_recommendations(region_name_to_search_recommendations) if recommendations: print("Processing Recommendations") process_recommendations(recommendations) else: print("No EBS volume recommendations available.")copied2.1.4.1.1
- 2.1.5CIEgWqVasfoBbwMdGzW4RDS Cost Analysis
2.1.5
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputs2.1.5- 2.1.5.1muwkw9vPhOYo6qgLyRfiAggregate and Visualize Comprehensive RDS CPU Utilization
2.1.5.1
Aggregate and Visualize Comprehensive RDS CPU Utilization
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task aggregates CPU utilization data for running RDS instances across an AWS account, computes the average CPU usage over a specified period, and plots the average to help in assessing overall resource efficiency.
inputsoutputsimport boto3 from datetime import datetime, timedelta region_name=None # None when you want to run the script for all regions #last_n_days = 30 # AWS Credentials - replace with your method to retrieve AWS credentials creds = _get_creds(cred_label)['creds'] # Placeholder function access_key = creds['username'] secret_key = creds['password'] def get_aws_regions(): """Get a list of all AWS regions.""" ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = ec2.describe_regions() return [region['RegionName'] for region in regions['Regions']] def fetch_rds_instances(region): """Fetch all RDS instances in a specific region.""" rds = boto3.client('rds',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) paginator = rds.get_paginator('describe_db_instances') page_iterator = paginator.paginate() rds_instances = [] for page in page_iterator: for instance in page['DBInstances']: rds_instances.append({ 'DBInstanceIdentifier': instance['DBInstanceIdentifier'], 'Region': region, }) return rds_instances def fetch_cpu_utilization(db_instance_identifier, region, start_time, end_time): """Fetch the average CPU utilization for an RDS instance.""" cloudwatch = boto3.client('cloudwatch',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) metrics = cloudwatch.get_metric_statistics( Namespace='AWS/RDS', MetricName='CPUUtilization', Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': db_instance_identifier}], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Average'] ) data_points = metrics.get('Datapoints', []) if data_points: avg_cpu = sum(dp['Average'] for dp in data_points) / len(data_points) else: avg_cpu = 0 return avg_cpu def plot_cpu_utilization(region_name=None, last_n_days=7): """Plot CPU utilization for RDS instances.""" start_time = datetime.utcnow() - timedelta(days=last_n_days) end_time = datetime.utcnow() regions = [region_name] if region_name else get_aws_regions() for region in regions: rds_instances = fetch_rds_instances(region) avg_utilizations = [] for instance in rds_instances: avg_cpu = fetch_cpu_utilization(instance['DBInstanceIdentifier'], region, start_time, end_time) avg_utilizations.append((instance['DBInstanceIdentifier'], avg_cpu)) avg_utilizations.sort(key=lambda x: x[1], reverse=True) top_instances = avg_utilizations[:3] bottom_instances = avg_utilizations[-3:] instance_ids = [x[0] for x in top_instances + bottom_instances] utilizations = [x[1] for x in top_instances + bottom_instances] # Plotting context.plot.add_trace( name="CPU Utilization", xpts=instance_ids, ypts=utilizations, tracetype='bar' ) context.plot.xlabel = 'Instance ID' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'Top & Bottom 3 RDS Instances by CPU Utilization (Last {last_n_days} Days)' # Example usage plot_cpu_utilization(region_name, last_n_days) # For all regions # plot_cpu_utilization(region_name='us-east-1', last_n_days=30) # For a specific regioncopied2.1.5.1 - 2.1.5.2gR6d2ayH2Q6SdugCwNmrDaily AWS RDS Costs using Athena
2.1.5.2
Daily AWS RDS Costs using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Specifically examine daily costs associated with AWS RDS, utilizing Athena to query CUR data for detailed spending analysis on database services.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs, last_n_days): print("x values (dates):", dates) print("y values (costs):", costs) context.plot.add_trace(name=f'Amazon RDS Costs (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines") context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily Amazon RDS Costs (Last {last_n_days} Days excluding last 2 days)' # last_n_days = 7 # To be passed as an inpur parameter query = f""" SELECT DATE(line_item_usage_start_date) AS usage_date, line_item_product_code AS product_code, product_database_engine AS database_engine, SUM(line_item_unblended_cost) AS daily_cost FROM my_cur_report_athena WHERE line_item_product_code = 'AmazonRDS' AND line_item_usage_start_date >= DATE_ADD('day', -{last_n_days} - 2, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) AND product_database_engine IS NOT NULL AND product_database_engine != '' GROUP BY DATE(line_item_usage_start_date), line_item_product_code, product_database_engine ORDER BY usage_date, product_code; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [row[0] for row in results] costs = [float(row[3]) for row in results] visualize_data(dates, costs, last_n_days) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.")copied2.1.5.2
- 2.1.6KL71A6cT6b12qKVHZfrAAWS Costs per Linked Account using Athena
2.1.6
AWS Costs per Linked Account using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Segment and analyze costs for each linked account under a consolidated billing setup using Athena to query CUR data, providing clarity on spending distribution across accounts.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(accounts, costs): if len(accounts) == 1: print("Only one account found. Skipping chart.") else: print("x values (accounts):", accounts) print("y values (costs):", costs) context.plot.xlabel = 'AWS Account ID' context.plot.ylabel = 'Cost ($)' context.plot.title = 'Cost by AWS Account ID' context.plot.add_trace(name="Cost by AWS Account ID", xpts=accounts, ypts=costs, tracetype="pie") print("Analysis complete.") # last_n_days = 7 # To be passed as an input parameter query = f""" SELECT line_item_usage_account_id AS account_id, SUM(line_item_unblended_cost) AS total_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days} - 2, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY line_item_usage_account_id ORDER BY total_cost DESC; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: accounts = [row[0] for row in results] costs = [float(row[1]) for row in results] visualize_data(accounts, costs) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.")copied2.1.6
- 3eCzWMDbXJVxpBaMPyUtgUnused Underutilized Unattached Orphaned AWS Resources
3
Unused Underutilized Unattached Orphaned AWS Resources
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_aws_regions(): # Create an EC2 client ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') # Call describe_regions to get a list of all regions regions = ec2_client.describe_regions() # Extract region names region_names = [region['RegionName'] for region in regions['Regions']] return region_names regions = get_aws_regions() ''' print("Available AWS Regions:") for region in regions: print(region) ''' #context.skip_sub_tasks=Truecopied3- 3.1DG2ColIiKle2QRVbOwgXDelete Unused AWS Route53 Health Checks
3.1
Delete Unused AWS Route53 Health Checks
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.AWS Route53 is a scalable and highly available DNS service that connects user requests to infrastructure running in AWS and outside. One of its features is health checks, which monitor the health of your resources. Over time, as resources are added or removed, or configurations change, some health checks may no longer be associated with any active resources, leading to unnecessary costs and potential confusion. This runbook identifies and removes these orphaned health checks which helps in optimizing costs, reducing clutter, and ensuring that only relevant health checks are active in your AWS account
inputsoutputs3.1- 3.1.1Lu9BiCTB0klqNB47oBmBGet All AWS Route53 Health Checks
3.1.1
Get All AWS Route53 Health Checks
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task retrieves a list of all health checks that have been configured in Amazon's Route53 service. AWS Route53 is a scalable and highly available domain name system (DNS) web service. A health check in Route53 monitors the health and performance of your web applications, web servers, and other resources. By fetching all health checks, users can review, manage, or diagnose the operational status and configuration of their resources, ensuring that the routing policies are working as expected. This can be especially useful for maintaining high availability and redundancy in distributed systems or for troubleshooting issues related to DNS routing.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 client for Amazon Route53 route53 = boto3.client('route53',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def get_all_healthchecks(): """ Retrieve all health checks from Route53. Returns: - list: List of health check IDs. """ healthchecks = [] try: # Using paginator to handle potential pagination of results paginator = route53.get_paginator('list_health_checks') for page in paginator.paginate(): for healthcheck in page['HealthChecks']: healthchecks.append(healthcheck['Id']) except route53.exceptions.Route53ServiceError as e: print(f"Route53 service error fetching health checks: {e}") except Exception as e: print(f"Error fetching health checks: {e}") finally: return healthchecks #Main Block print("Fetching all health checks...") all_healthchecks = get_all_healthchecks() print(f"Found {len(all_healthchecks)} health checks.") for hc in all_healthchecks: print(hc)copied3.1.1 - 3.1.2bvt1ze2f6WpFy2jZKwGoFilter Out Unused Route53 Health Checks
3.1.2
Filter Out Unused Route53 Health Checks
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.AWS Route53, Amazon's DNS service, offers health checks to monitor and report the availability of specific resources. Over time, with changes in configurations, deployments, or scaling activities, some of these health checks might become redundant, as they are no longer associated with active resources. Filtering out these unused health checks is an essential maintenance activity. By doing so, users can identify and potentially remove extraneous checks, helping streamline the management of their DNS configurations, optimize costs, and maintain a cleaner, more efficient environment.
inputsoutputsimport boto3 import sys creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 client for Amazon Route53 route53 = boto3.client('route53',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def get_all_resource_record_sets(hosted_zone_id): """ Retrieve all resource record sets for a hosted zone. Returns: - list: List of resource record sets. """ records = [] try: # Using paginator to handle potential pagination of results paginator = route53.get_paginator('list_resource_record_sets') for page in paginator.paginate(HostedZoneId=hosted_zone_id): records.extend(page['ResourceRecordSets']) except route53.exceptions.NoSuchHostedZone as e: print(f"Specified hosted zone {hosted_zone_id} does not exist: {e}") except Exception as e: print(f"Error fetching resource record sets for hosted zone {hosted_zone_id}: {e}") return records # Here, unused health check is a health check which is not associated to any resource record in the hosted zones def filter_unused_healthchecks(hosted_zones, all_healthchecks_s): """ Filter out health checks that are in use. Parameters: - hosted_zones (list): List of hosted zones. - all_healthchecks (list): List of all health checks. Returns: - list: List of unused health check IDs. """ # Initialize an empty set to store health checks that are in use used_healthchecks = set() # Iterate through each hosted zone for hosted_zone in hosted_zones: try: # Fetch resource record sets for the current hosted zone for record in get_all_resource_record_sets(hosted_zone['Id']): # If a health check is associated with the record, add it to the set of used health checks if 'HealthCheckId' in record: used_healthchecks.add(record['HealthCheckId']) except Exception as e: print(f"Error processing hosted zone {hosted_zone['Id']}: {e}") # Return the set of health checks that are not in use return list(set(all_healthchecks_s) - used_healthchecks) # Main block # Fetch all hosted zones print("Fetching all hosted zones...") try: hosted_zones = route53.list_hosted_zones()['HostedZones'] print(f"Found {len(hosted_zones)} hosted zones.") except Exception as e: print(f"Error fetching hosted zones: {e}") #sys.exit(1) # Exit the script with an error code # all_healthchecks = [] #for testing otherwise initialized passed down from parent task # all_healthchecks passed down from parent task if all_healthchecks: unused_healthchecks = filter_unused_healthchecks(hosted_zones, all_healthchecks) # Ensure that unused_healthchecks is a list, even if empty unused_healthchecks = unused_healthchecks if unused_healthchecks else [] # Print the unused health checks if unused_healthchecks: print("Unused health checks found:") for hc in unused_healthchecks: print(hc) else: print("No unused health checks found.") else: print("Zero Route 53 Health checks were found") context.skip_sub_tasks = Truecopied3.1.2- 3.1.2.1HkGl0qCI6WjCuOrTcsE7Delete AWS Route53 Health Checks
3.1.2.1
Delete AWS Route53 Health Checks
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.AWS Route53 is Amazon's DNS web service, and it provides health checks to monitor the health of resources and applications. Over time, as configurations change or resources are decommissioned, certain health checks might no longer be relevant or needed. Deleting these unnecessary Route53 health checks helps in decluttering the AWS environment, reducing potential costs, and simplifying management. It's essential to periodically review and delete any health checks that are no longer in use to maintain an optimized and streamlined AWS setup.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 client for Amazon Route53 route53 = boto3.client('route53',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def delete_healthcheck(healthcheck_id): """ Delete a specific health check. Parameters: - healthcheck_id (str): The ID of the health check to delete. """ try: route53.delete_health_check(HealthCheckId=healthcheck_id) print(f"Successfully deleted health check: {healthcheck_id}") except route53.exceptions.NoSuchHealthCheck: print(f"Health check {healthcheck_id} does not exist.") except route53.exceptions.HealthCheckInUse: print(f"Health check {healthcheck_id} is still in use and cannot be deleted.") except Exception as e: print(f"Error deleting health check {healthcheck_id}: {e}") def process_health_checks(unused_healthchecks_list): """ Process and delete the provided health checks. Parameters: - unused_healthchecks_list (list): List of health check IDs to delete. """ # Ensure that unused_healthchecks_list is a list, even if empty unused_healthchecks_list = unused_healthchecks_list if unused_healthchecks_list else [] if unused_healthchecks_list: # Delete each unused health check print("Deleting unused health checks...") for healthcheck_id in unused_healthchecks_list: delete_healthcheck(healthcheck_id) else: print("No unused health checks...") # Main Block ''' # List of unused health checks to delete. # This should be updated based on the output from the previous script. # Example list type -> unused_healthchecks = ['d7d64110-9aa9-4cb2-a63b-9f33d96dd2d2'] # Replace with actual IDs if using the task in a standalone manner and not taking any inputs from parent task ''' # If the unused_healthchecks variable is not defined (e.g., it's not passed from a parent task), initialize it as an empty list. try: unused_healthchecks except NameError: unused_healthchecks = [] # Process (delete) the unused health checks process_health_checks(unused_healthchecks)copied3.1.2.1
- 3.2TW5d0nV9SFA7DErlfOuXClean up unused/unattached AWS Elastic IP addresses
3.2
Clean up unused/unattached AWS Elastic IP addresses
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook is designed to identify unattached Elastic IPs within all AWS regions and release them. Elastic IPs that are not associated with any instances can accumulate over time and result in unnecessary costs. By using this runbook, you can efficiently release these unattached Elastic IPs, optimizing your resources and reducing expenses
inputsoutputs3.2- 3.2.1odg1fP1uHodsc3u5SKavFilter out all unused/unattached AWS Elastic IP addresses
3.2.1
Filter out all unused/unattached AWS Elastic IP addresses
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This step involves searching through regions to identify Elastic IPs that are not currently associated with any instances. By iteratively querying each region's EC2 service, the script collects details about unattached Elastic IPs, including their public IP addresses, allocation IDs, and regions.
inputsoutputsimport boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def find_unattached_elastic_ips(regions): unattached_ips = [] # Loop through each region for region in regions: try: # Fetch the list of Elastic IPs for the region ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) response = ec2_client.describe_addresses() except botocore.exceptions.BotoCoreError as e: print(f"Error fetching addresses in region {region}: {e}") continue # Check each Elastic IP for an association for eip in response.get('Addresses', []): if 'AssociationId' not in eip: # Add unattached Elastic IPs to the list unattached_ips.append({'public_ip': eip.get('PublicIp', ''), 'allocation_id': eip.get('AllocationId', ''), 'region': region}) return unattached_ips # List of regions to search for unattached Elastic IPs #regions = ['us-east-1'] # Add your desired regions here # Find unattached Elastic IPs unattached_ips = find_unattached_elastic_ips(regions) if len(unattached_ips) == 0: print("No unattached Elastic IPs found.") else: print("Unattached Elastic IPs:") # Print details for each unattached Elastic IP for ip_info in unattached_ips: print(f"Public IP: {ip_info['public_ip']}, Allocation ID: {ip_info['allocation_id']}, Region: {ip_info['region']}") context.skip_sub_tasks = Truecopied3.2.1- 3.2.1.1c6CFzJzAaTihnk9SROZcRelease unused/unattached AWS Elastic IP addresses
3.2.1.1
Release unused/unattached AWS Elastic IP addresses
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Once the unattached Elastic IPs are identified, this task releases them from the AWS environment. By using the collected allocation IDs and regions, the script communicates with the EC2 service to release each unattached Elastic IP. This process ensures that these IPs are no longer reserved, contributing to cost savings and resource optimization.
inputsoutputsimport boto3 import botocore.exceptions creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Function to release an Elastic IP def release_elastic_ip(ec2_client, allocation_id, region): ec2 = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) try: # Attempt to release the Elastic IP response = ec2.release_address(AllocationId=allocation_id) return f"Released Elastic IP {allocation_id} in region {region}" except botocore.exceptions.BotoCoreError as e: return f"Error releasing Elastic IP {allocation_id} in region {region}: {e}" # List of regions to search for unattached Elastic IPs #regions = ["us-east-1"] # Add your desired regions here if len(unattached_ips) == 0: print("No unattached Elastic IPs found.") else: print("Unattached Elastic IPs:") # Print details for each unattached Elastic IP for ip_info in unattached_ips: print(f"Public IP: {ip_info['public_ip']}, Allocation ID: {ip_info['allocation_id']}, Region: {ip_info['region']}") # Release unattached Elastic IPs for ip_info in unattached_ips: response = release_elastic_ip(boto3.client('ec2'), ip_info['allocation_id'], ip_info['region']) print(response) context.proceed = Falsecopied3.2.1.1 - 3.2.1.2oqRPhDo262rV0rQ4Q0BLList and release all Elastic IPs in all regions
3.2.1.2
List and release all Elastic IPs in all regions
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task is a combination of listing all Elastic IPs and releasing them if they are not associated with any instance all the while looping through all the regions.
Note:- Even though the script is a one time stop for finding unattached Elastic IPs in all regions it takes relatively higher time to complete the process than the script focused on specified regions as it loops through all the regions.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Create an EC2 client instance ec2 = boto3.client("ec2",aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') # Dictionary to store unused Elastic IPs with their allocation IDs and regions unused_ips = {} #Loop through all regions for region in ec2.describe_regions()["Regions"]: region_name = region["RegionName"] try: # Create an EC2 client for the specific region ec2conn = boto3.client("ec2",aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) # Retrieve all addresses (Elastic IPs) addresses = ec2conn.describe_addresses( Filters=[{"Name": "domain", "Values": ["vpc"]}] )["Addresses"] # Iterate through each address for address in addresses: # Check if the address is not associated with any instance if ( "AssociationId" not in address and address["AllocationId"] not in unused_ips ): # Store the unused Elastic IP's allocation ID and region unused_ips[address["AllocationId"]] = region_name # Release the unused Elastic IP ec2conn.release_address(AllocationId=address["AllocationId"]) print( f"Deleted unused Elastic IP {address['PublicIp']} in region {region_name}" ) except Exception as e: # Handle cases where there's no access to a specific region print(f"No access to region {region_name}: {e}") # Print the summary of deleted unused Elastic IPs print(f"Found and deleted {len(unused_ips)} unused Elastic IPs across all regions:") print(unused_ips)copied3.2.1.2
- 3.3jNXSdJMO1ya9ezdcxAEDDelete Underutilized AWS RDS Instances
3.3
Delete Underutilized AWS RDS Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook involves identifying Amazon RDS instances that consistently exhibit low CPU usage over a specific time frame and then safely removing them. By leveraging Amazon CloudWatch metrics, organizations can pinpoint underutilized RDS instances, set a CPU utilization threshold, and analyze instances that fall below this mark. Before initiating deletion, it's crucial to disable any active 'Deletion Protection' and to create a final snapshot as a backup measure. This proactive approach not only ensures cost efficiency by eliminating unnecessary expenses but also optimizes resource management within AWS.
inputsoutputs3.3- 3.3.1P4qaH1250SIEww4JwKp6List All AWS RDS Instances
3.3.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves enumerating and displaying all AWS RDS (Amazon Relational Database Service) instances within an AWS account. This task is essential for management and auditing purposes, providing a clear view of all RDS instances. During this process, the script communicates with AWS services to retrieve information about each RDS instance, including their identifiers, status, and any other relevant details. This information is crucial for administrators to understand their AWS infrastructure's state, aiding in further actions like modification, deletion, or analysis of the RDS instances.
inputsoutputsimport boto3 from botocore.exceptions import BotoCoreError, ClientError reg=None # Runs for specific region if provided otherwise runs for all regions if None/nothing is provided. creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_rds_instances(region=None): try: if region: regions = [region] else: # If region is None, list instances in all available regions ec2 = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] all_instances = [] for region in regions: #print(f"Listing RDS instances in region {region}:") client = boto3.client('rds', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) try: db_instances = client.describe_db_instances() region_instances = [instance['DBInstanceIdentifier'] for instance in db_instances['DBInstances']] if region_instances: print(f"Found {len(region_instances)} RDS instances in region {region}:") for instance in region_instances: print(instance) all_instances.append({"region": region, "instance": instance}) else: #print(f"No RDS instances found in region {region}.") p=1 # dummy line to end the conditional block properly except ClientError as e: print(f"Client error in region {region}: {e}") except BotoCoreError as e: print(f"BotoCoreError in region {region}: {e}") except Exception as e: print(f"Unexpected error in region {region}: {e}") return all_instances except Exception as e: print(f"Unexpected error: {e}") instances = list_all_rds_instances(reg) # reg is initialized in input parameters. Runs for specific region if provided otherwise runs for all regions if None/nothing is provided. #print(instances)copied3.3.1 - 3.3.2lugRWP8IXb7X8fduzJ02Filter out AWS RDS Instances with Low CPU Utilization
3.3.2
Filter out AWS RDS Instances with Low CPU Utilization
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task identifies Amazon RDS instances that are underperforming or underutilized in terms of CPU usage. By utilizing Amazon CloudWatch metrics, users can monitor and assess the CPU performance of their RDS instances over a specified period. By setting a CPU utilization threshold, they can filter out instances that consistently operate below this limit, indicating potential over-provisioning or underuse. Highlighting these low-utilization instances aids organizations in optimizing their AWS resource allocation, ensuring cost efficiency and facilitating informed decisions about scaling or decommissioning certain database resources.
inputsoutputsimport boto3 from datetime import datetime, timedelta from botocore.exceptions import BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Constants # Define the threshold for CPU utilization(Defined in input parameters). # Instances below this will be considered low utilization. LOW_CPU_THRESHOLD=20 # Hardcoded for One time Filter Result LOOKBACK_PERIOD_HOURS=24 # Hardcoded for One time Filter Result LOOKBACK_PERIOD = 3600 * int(LOOKBACK_PERIOD_HOURS) # Define the period to check. Here, it's set to 24 hours. def get_low_cpu_rds_instances(instances): low_cpu_instances = [] # List to store the IDs of RDS instances with low CPU utilization # Group instances by region instances_by_region = {} for instance_info in instances: region = instance_info['region'] if region not in instances_by_region: instances_by_region[region] = [] instances_by_region[region].append(instance_info['instance']) # Check each region for region, instance_list in instances_by_region.items(): cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Loop through each RDS instance for instance_id in instance_list: try: end_time = datetime.utcnow() start_time = end_time - timedelta(seconds=LOOKBACK_PERIOD) metrics = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/RDS', 'MetricName': 'CPUUtilization', 'Dimensions': [{ 'Name': 'DBInstanceIdentifier', 'Value': instance_id }] }, 'Period': LOOKBACK_PERIOD, 'Stat': 'Average' }, 'ReturnData': True, }, ], StartTime=start_time, EndTime=end_time ) if metrics['MetricDataResults'][0]['Values']: cpu_utilization = metrics['MetricDataResults'][0]['Values'][0] if cpu_utilization < int(LOW_CPU_THRESHOLD): low_cpu_instances.append({ 'InstanceID': instance_id, 'AverageCPU': cpu_utilization, 'Region': region }) except Exception as e: print(f"Error fetching CloudWatch metrics for RDS instance {instance_id} in {region}: {e}") return low_cpu_instances def display_low_cpu_instances(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "Low Usage RDS Instances Overview" table.num_cols = 3 # Number of columns for Instance ID, Average CPU, and Region table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the structure of low_cpu_instances data headers = ["Instance ID", "Average CPU (%)", "Region"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Populate the table with instance data for row_num, instance in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each entry values = [ instance["InstanceID"], f"{instance['AverageCPU']:.2f}", # Format average CPU as a float with 2 decimal places instance["Region"] ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) low_cpu_rds = get_low_cpu_rds_instances(instances) # Print the results if low_cpu_rds: display_low_cpu_instances(low_cpu_rds) ''' print("RDS instances with low CPU utilization:") for instance_info in low_cpu_rds: print(f"Instance ID: {instance_info['InstanceID']}, Average CPU: {instance_info['AverageCPU']}% in Region: {instance_info['Region']}")''' else: print("No RDS instances with low CPU utilization found.") context.skip_sub_tasks=Truecopied3.3.2- 3.3.2.1flcqDvNamXsOTVclwnPaDelete AWS RDS Instance
3.3.2.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task terminates the AWS RDS database and deletes all associated data. Before deletion, users often create a final snapshot to preserve the database's current state, enabling future restoration if necessary. It's essential to ensure that the "Deletion Protection" feature, designed to prevent accidental deletions, is disabled before proceeding. Once deleted, the RDS instance is no longer operational, and associated costs cease. However, any retained backups or snapshots will persist and may incur storage charges until they too are deleted.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_rds_instance(instance_info): """ Delete an RDS instance after taking necessary precautions like disabling deletion protection and creating a final snapshot. Parameters: - instance_info (dict): Dictionary containing InstanceID and Region. """ instance_id = instance_info['InstanceID'] region = instance_info['Region'] # Initialize the boto3 client for the Amazon Relational Database Service (RDS) in the specified region rds = boto3.client('rds', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) try: instance_details = rds.describe_db_instances(DBInstanceIdentifier=instance_id) if instance_details['DBInstances'][0].get('DeletionProtection', False): rds.modify_db_instance(DBInstanceIdentifier=instance_id, DeletionProtection=False) print(f"Deletion protection disabled for {instance_id}") except rds.exceptions.DBInstanceNotFoundFault: print(f"RDS instance {instance_id} not found.") return except Exception as e: print(f"Error modifying RDS instance {instance_id}: {e}") return try: snapshot_name = f"final-snapshot-{instance_id}" rds.create_db_snapshot(DBInstanceIdentifier=instance_id, DBSnapshotIdentifier=snapshot_name) print(f"Final snapshot creation initiated for {instance_id}") waiter = rds.get_waiter('db_snapshot_completed') waiter.wait(DBSnapshotIdentifier=snapshot_name) print(f"Final snapshot {snapshot_name} created for {instance_id}") except rds.exceptions.SnapshotQuotaExceededFault: print(f"Snapshot quota exceeded for {instance_id}.") return except rds.exceptions.DBInstanceNotFoundFault: print(f"RDS instance {instance_id} not found.") return except Exception as e: print(f"Error creating snapshot for RDS instance {instance_id}: {e}") return try: rds.delete_db_instance(DBInstanceIdentifier=instance_id, SkipFinalSnapshot=True) print(f"RDS instance {instance_id} deletion initiated") except rds.exceptions.DBInstanceNotFoundFault: print(f"RDS instance {instance_id} not found.") except Exception as e: print(f"Error deleting RDS instance {instance_id}: {e}") rds_instances_to_delete = low_cpu_rds # Make sure low_cpu_rds is a list of dictionaries with 'InstanceID' and 'Region' keys # Check if the list is empty if not rds_instances_to_delete: print("No RDS instances provided for deletion.") else: # Loop through each RDS instance in the list and call the delete function for instance_info in rds_instances_to_delete: delete_rds_instance(instance_info)copied3.3.2.1
- 3.4GbZTCmimalprzmSluMZSDelete Underutilized AWS Redshift Clusters
3.4
Delete Underutilized AWS Redshift Clusters
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook involves identifying Amazon Redshift clusters that have consistently low CPU utilization over a specified period and then deleting them to optimize resource usage and reduce costs. By monitoring the CPU metrics of Redshift clusters using Amazon CloudWatch, organizations can determine which clusters are underutilized and might be candidates for deletion or resizing. Deleting or resizing underutilized clusters ensures efficient use of resources and can lead to significant cost savings on cloud expenditures.
inputsoutputs3.4- 3.4.1On5qrCnBeQ2ebOtQXFpKGet All AWS Redshift Clusters
3.4.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This process retrieves a list of all Amazon Redshift clusters within an AWS account. Amazon Redshift is a fully managed data warehouse service in the cloud that allows users to run complex analytic queries against petabytes of structured data. By fetching all Redshift clusters, users can gain insights into the number of active clusters, their configurations, statuses, and other related metadata. This information is crucial for administrative tasks, monitoring, and optimizing costs and performance.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_all_redshift_clusters(region=None): all_clusters = {} ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') regions_to_check = [region] if region else [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] for region in regions_to_check: # Initialize the Redshift client for the specified region redshift = boto3.client('redshift', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) clusters = [] try: # Using paginator to handle potential pagination of results paginator = redshift.get_paginator('describe_clusters') for page in paginator.paginate(): clusters.extend(page['Clusters']) if clusters: # Check if clusters list is not empty all_clusters[region] = clusters except Exception as e: print(f"Error fetching Redshift clusters in region {region}: {e}") return all_clusters # Set region to None for all regions, or specify a valid AWS region string for a specific region # Example: target_region = 'us-west-1' # Or None for all regions target_region = None # Get all Redshift clusters all_clusters = get_all_redshift_clusters(target_region) if all_clusters: print(f"Total Redshift Clusters: {sum(len(clusters) for clusters in all_clusters.values())}") for region, clusters in all_clusters.items(): print(f"In region {region}:") for cluster in clusters: print(f" - {cluster['ClusterIdentifier']}") else: print("No Redshift clusters found")copied3.4.1 - 3.4.2pSQEnVjE9egrrY2PCcg5Filter out Underutilized AWS Redshift Clusters
3.4.2
Filter out Underutilized AWS Redshift Clusters
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task pertains to identifying Amazon Redshift clusters that exhibit consistently low CPU utilization over a predefined time span. By leveraging Amazon CloudWatch metrics, organizations can detect underutilized Redshift clusters. Recognizing such clusters provides valuable insights, allowing teams to make informed decisions about potential downscaling, resource reallocation, or other optimization measures to ensure efficient cloud resource usage.
inputsoutputsimport boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Constants #LOW_CPU_THRESHOLD = 20 # Example Threshold for CPU utilization #LOOKBACK_PERIOD_HOURS = 24 # Example lookback period in hours LOOKBACK_PERIOD = 3600 * LOOKBACK_PERIOD_HOURS # Convert hours to seconds def get_low_cpu_redshift_clusters(all_clusters): """ Identify and list Redshift clusters with average CPU utilization below a defined threshold over a specific period. Parameters: - all_clusters (dict): Dictionary with region as keys and list of cluster info as values. Returns: - list: List of dictionaries containing Redshift cluster identifiers and their average CPU utilization. """ low_cpu_clusters = [] # List to store the cluster details with low CPU utilization for region, clusters in all_clusters.items(): # Initialize boto3 client for CloudWatch in the specific region cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) for cluster in clusters: cluster_id = cluster['ClusterIdentifier'] try: # Query CloudWatch to fetch the CPU utilization metric for the defined period metrics = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/Redshift', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'ClusterIdentifier', 'Value': cluster_id}] }, 'Period': LOOKBACK_PERIOD, 'Stat': 'Average' # We're interested in the average CPU utilization }, 'ReturnData': True, }, ], StartTime=datetime.utcnow() - timedelta(seconds=LOOKBACK_PERIOD), EndTime=datetime.utcnow() ) # Check if the cluster's CPU utilization falls below the threshold if metrics['MetricDataResults'][0]['Values']: cpu_utilization = metrics['MetricDataResults'][0]['Values'][0] if cpu_utilization < LOW_CPU_THRESHOLD: low_cpu_clusters.append({ 'Region': region, 'ClusterID': cluster_id, 'AverageCPU': cpu_utilization }) except Exception as e: print(f"Error checking CPU utilization for cluster {cluster_id} in region {region}: {e}") return low_cpu_clusters # Example usage (assuming all_clusters is provided from an upstream task) # all_clusters = { # 'us-west-2': [{'ClusterIdentifier': 'cluster1'}, {'ClusterIdentifier': 'cluster2'}], # 'us-east-1': [{'ClusterIdentifier': 'cluster3'}], # } clusters_info = get_low_cpu_redshift_clusters(all_clusters) # Print the results if clusters_info: print("Redshift clusters with low CPU utilization:") for cluster in clusters_info: print(f"Region: {cluster['Region']}, Cluster ID: {cluster['ClusterID']}, Average CPU: {cluster['AverageCPU']:.2f}%") else: print("No Redshift clusters with low CPU utilization found.") context.skip_sub_tasks=Truecopied3.4.2- 3.4.2.1qDood607VHbhjFv6yYZoDelete AWS Redshift Clusters
3.4.2.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves terminating specific Amazon Redshift clusters, effectively removing them from an AWS account. Deleting a Redshift cluster permanently erases all the data within the cluster and cannot be undone. This process might be undertaken to manage costs, decommission outdated data warehouses, or perform clean-up operations. It's crucial to ensure appropriate backups (snapshots) are in place before initiating a deletion to prevent accidental data loss.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_redshift_cluster(cluster_id, region): """ Attempts to delete a specified Amazon Redshift cluster in a given region. Parameters: - cluster_id (str): The unique identifier of the Redshift cluster to be deleted. - region (str): The AWS region where the Redshift cluster is located. """ try: # Initialize the boto3 client for Amazon Redshift with the appropriate region redshift = boto3.client('redshift', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) # Initiate the deletion of the specified Redshift cluster. response = redshift.delete_cluster(ClusterIdentifier=cluster_id, SkipFinalClusterSnapshot=True) print(f"Redshift cluster {cluster_id} deletion initiated in region {region}.") except redshift.exceptions.ClusterNotFoundFault: print(f"Redshift cluster {cluster_id} not found in region {region}.") except redshift.exceptions.InvalidClusterStateFault: print(f"Redshift cluster {cluster_id} is in an invalid state for deletion in region {region}.") except Exception as e: print(f"Error deleting Redshift cluster {cluster_id} in region {region}: {e}") # Example usage #clusters_info = [{'Region': 'us-west-2', 'ClusterID': 'example-cluster-1'}, {'Region': 'us-east-1', 'ClusterID': 'example-cluster-2'}] clusters_to_delete = clusters_info # clusters_info passed down from parent task to delete said Redshift Clusters # Can replace clusters_info with a list of cluster_id to delete any Redshift Cluster using this task if clusters_to_delete: for cluster in clusters_to_delete: delete_redshift_cluster(cluster['ClusterID'], cluster['Region']) else: print("No Redshift Clusters provided for deletion")copied3.4.2.1
- 3.5ZteKDrH1fQUuyltMg625Delete Unused Secrets from AWS Secrets Manager
3.5
Delete Unused Secrets from AWS Secrets Manager
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook is designed to identify and remove secrets from the AWS Secrets Manager that haven't been accessed or utilized within a defined period (e.g., the past 90 days). Secrets Manager is a tool that helps manage sensitive information like API keys or database credentials. Over time, some secrets may become obsolete or unused, occupying unnecessary space and potentially incurring extra costs. This task automates the cleanup process by scanning for these dormant secrets and safely deleting them. Before executing this runbook, ensure proper AWS IAM permissions are set. Always use caution when deleting secrets to avoid unintended disruptions to applications or services.
inputsoutputs3.5- 3.5.1rzspDcFpayEOCHvPWd6cGet All Secrets from AWS Secrets Manager
3.5.1
Get All Secrets from AWS Secrets Manager
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task retrieves a list of all secrets stored in the AWS Secrets Manager for your account. AWS Secrets Manager is a service designed to safeguard sensitive information such as database credentials and API keys. By executing this task, users will obtain a comprehensive list of secret names or ARNs, aiding in audit, management, or automation processes. Note that this task will list the secrets' identifiers, but not their actual values. To fetch a specific secret's value, additional steps involving the get_secret_value method are required. Ensure you have the appropriate AWS IAM permissions before executing this task.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_all_secrets(secrets_client): try: secrets = secrets_client.list_secrets() return [secret['Name'] for secret in secrets['SecretList']] except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'UnrecognizedClientException': print(f"Invalid security token or service not available in {secrets_client.meta.region_name}. Skipping.") else: print(f"ClientError {error_code} in {secrets_client.meta.region_name}: {e}") return [] except Exception as e: print(f"An unexpected error occurred in {secrets_client.meta.region_name}: {e}") return [] # Main block # Specify the region here. If None, it will loop through all available regions. #REGION = 'us-east-1' #print(f"regions received from top task {regions}") REGION=None # Hardcoded for One Time Execution Result regions = [REGION] if REGION else regions all_secrets_data = [] for region in regions: try: secrets_client = boto3.client('secretsmanager',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) all_secrets = get_all_secrets(secrets_client) if all_secrets: print(f"All Secrets in {region}:") for secret in all_secrets: print(secret) all_secrets_data.append({'region': region, 'secret': secret}) else: #print(f"No secrets found in {region}.") p=1 #Dummy line to remove bad input error except Exception as e: print(f"An unexpected error occurred while processing {region}: {e}") ''' # Print the all_secrets_data list to check the content print("\nAll Secrets Data:") for secret_data in all_secrets_data: print(f"Region: {secret_data['region']}, Secret: {secret_data['secret']}") '''copied3.5.1 - 3.5.2qqpgmSPqOwb7P6LgGYH8Filter out Unused Secrets from AWS Secrets Manager
3.5.2
Filter out Unused Secrets from AWS Secrets Manager
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task aims to pinpoint secrets within AWS Secrets Manager that haven't been accessed for a specified duration, such as the past 90 days. Over time, certain secrets may not be referenced or utilized, indicating they may no longer be needed. By identifying these inactive secrets, organizations can assess their continued relevance, streamline their secrets inventory, and enhance security by minimizing potential exposure points. Before taking any action based on the results, it's crucial to review the list and ensure no critical secrets are mistakenly categorized as "unused."
inputsoutputsimport boto3 from datetime import datetime, timedelta from botocore.exceptions import ClientError UNUSED_DAYS_THRESHOLD=60 #Harcoded For One time result creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def find_unused_secrets(secrets_client, secret_name): try: secret_details = secrets_client.describe_secret(SecretId=secret_name) last_accessed_date = secret_details.get('LastAccessedDate') # Return True if the secret is either never accessed or last accessed > 90 days ago return not last_accessed_date or (datetime.now(last_accessed_date.tzinfo) - last_accessed_date > timedelta(days=UNUSED_DAYS_THRESHOLD)) except ClientError as e: error_code = e.response['Error']['Code'] if error_code == 'UnrecognizedClientException': print(f"Invalid security token or service not available. Skipping secret: {secret_name}.") else: print(f"ClientError {error_code} for secret {secret_name}: {e}") return False except Exception as e: print(f"An unexpected error occurred for secret {secret_name}: {e}") return False # Main block # Check if all_secrets_data is defined and is not None; if not, initialize as an empty list # all_secrets_data passed down from get_all_secrets task all_secrets = all_secrets_data if 'all_secrets_data' in locals() and all_secrets_data is not None else [] ''' # Sample data from the previous task all_secrets = [ {'region': 'us-east-1', 'secret': 'test/user'}, {'region': 'us-east-1', 'secret': 'test-unused/user'}, # ... add more secrets and regions as needed ] ''' all_unused_secrets = [] for secret_data in all_secrets: region = secret_data['region'] secret_name = secret_data['secret'] try: secrets_client = boto3.client('secretsmanager',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) if find_unused_secrets(secrets_client, secret_name): all_unused_secrets.append(secret_data) print(f"Secret {secret_name} in region {region} is unused.") else: print(f"Secret {secret_name} in region {region} is active.") except Exception as e: print(f"An unexpected error occurred while processing secret {secret_name} in region {region}: {e}") # Displaying the unused secrets list print("\nAll Unused Secrets Data:") for secret_data in all_unused_secrets: print(f"Region: {secret_data['region']}, Secret: {secret_data['secret']}") context.skip_sub_tasks=Truecopied3.5.2- 3.5.2.1fOUMq7LdcxtI7gnnRTqIDelete Secret from AWS Secrets Manager
3.5.2.1
Delete Secret from AWS Secrets Manager
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task focuses on securely removing a specified secret from AWS Secrets Manager. Deleting secrets can be an essential step in managing sensitive information, especially if a secret is no longer in use or has been compromised. By executing this task, the targeted secret will be permanently erased from AWS Secrets Manager, ensuring it can't be accessed or retrieved. It's crucial to double-check the secret's relevance and backup any necessary data before deletion to prevent any unintended data loss or service disruptions.
inputsoutputsimport boto3 from botocore.exceptions import ClientError, NoCredentialsError, PartialCredentialsError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_secret(secrets_client, secret_name): try: secrets_client.delete_secret(SecretId=secret_name) print(f"Deleted secret: {secret_name}") return True except ClientError as e: # Handle specific known errors if e.response['Error']['Code'] == 'ResourceNotFoundException': print(f"Secret {secret_name} not found. Skipping.") else: print(f"Error deleting secret {secret_name}: {e}") return False # To handle any other boto3 specific errors except (NoCredentialsError, PartialCredentialsError): print("Authentication error. Please check your AWS credentials.") return False # A catch-all for other exceptions which we may not anticipate except Exception as e: print(f"An unexpected error occurred: {e}") return False # Main block # Check if all_unused_secrets is defined and is not None; if not, initialize as an empty list # all_unused_secrets passed down from get_all_secrets task all_unused_secrets = all_unused_secrets if 'all_unused_secrets' in locals() and all_unused_secrets is not None else [] ''' # Sample data for testing purposes. This will be passed from the upstream task. all_unused_secrets = [ {'region': 'us-east-1', 'secret': 'sample_secret_1'}, {'region': 'us-east-2', 'secret': 'sample_secret_2'}, # ... add more secrets and regions as needed ] # Example data ''' if all_unused_secrets: for secret_data in all_unused_secrets: region = secret_data['region'] secret_name = secret_data['secret'] try: # Initialize the secrets client for the given region secrets_client = boto3.client('secretsmanager',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) # Attempt to delete the secret delete_secret(secrets_client, secret_name) except Exception as e: print(f"An unexpected error occurred in {region}: {e}") else: print("No secrets provided. Exiting.")copied3.5.2.1
- 3.6Dawb5IdhwlUvff4cbZZ8Delete unused AWS CloudWatch Log Streams
3.6
Delete unused AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook automates the process of identifying and deleting unused Amazon CloudWatch Log Streams. By scanning specified log groups across designated AWS regions, it efficiently detects log streams that have been inactive for a predetermined period. Once identified, these log streams are safely removed, helping organizations maintain a clutter-free logging environment and potentially reducing associated storage costs.
inputsoutputs3.6- 3.6.1h2ozXNRKuV8VsHandh8wList all AWS CloudWatch Log Streams
3.6.1
List all AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task is designed to systematically retrieve and enumerate all CloudWatch log streams present in specified AWS regions. It offers a detailed snapshot of the existing log streams, enabling users to understand their logging landscape across various AWS services and applications.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def fetch_log_streams(client, region): log_streams_info = [] #print(f"\nFetching log streams for region: {region}...") log_groups = client.describe_log_groups() if not log_groups.get('logGroups'): #print(f"No log groups or streams found in region: {region}.") return log_streams_info for log_group in log_groups['logGroups']: log_group_name = log_group['logGroupName'] log_streams = client.describe_log_streams(logGroupName=log_group_name) for stream in log_streams.get('logStreams', []): #print(f"Region: {region}, Log Group: {log_group_name}, Log Stream: {stream['logStreamName']}") # Append the information to log_streams_info log_streams_info.append({ 'region': region, 'log_group': log_group_name, 'log_stream': stream['logStreamName'] }) return log_streams_info def list_all_log_streams(region=None): log_streams_info = [] # To store log streams information for all regions try: # Create an initial client to fetch regions if no specific region is provided ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') if region: regions = [region] else: regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] for specific_region in regions: client = boto3.client('logs', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=specific_region) region_log_streams = fetch_log_streams(client, specific_region) log_streams_info.extend(region_log_streams) except boto3.exceptions.Boto3Error as e: print(f"An error occurred while accessing AWS: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return log_streams_info def display_log_streams(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "AWS Log Streams Data" table.num_cols = 3 # Number of columns for Region, Log Group, and Log Stream table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names headers = ["Region", "Log Group", "Log Stream"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the log stream data by region for better readability data.sort(key=lambda x: x["region"]) # Populate the table with log stream data for row_num, entry in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each log stream entry values = [entry["region"], entry["log_group"], entry["log_stream"]] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main block target_region = None # hardcoded for one time result log_streams_data = list_all_log_streams(target_region) # Pass the name of region as a string to search for that specific region otherwise it runs for all regions #print("\nCompleted fetching log streams data.") # Uncomment the line below if you want to see the returned data structure # print(log_streams_data) display_log_streams(log_streams_data)copied3.6.1 - 3.6.2OK6np9mI65cVmADIdvR6Filter Unused AWS CloudWatch Log Streams
3.6.2
Filter Unused AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task examines CloudWatch log streams to identify those that have been inactive for a specified duration. By pinpointing these dormant streams, the task aids in maintaining a cleaner, more efficient logging environment and can subsequently assist in reducing unnecessary storage costs associated with retaining outdated logs on AWS CloudWatch.
inputsoutputsimport boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def filter_unused_log_streams(all_log_streams, unused_days=30): unused_log_streams = [] for log_info in all_log_streams: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: log_stream = client.describe_log_streams( logGroupName=log_info['log_group'], logStreamNamePrefix=log_info['log_stream'] )['logStreams'][0] # We're using prefix, so getting the first result # Check if the log stream has a 'lastEventTimestamp' if 'lastEventTimestamp' in log_stream: last_event_date = datetime.utcfromtimestamp(log_stream['lastEventTimestamp'] / 1000) if last_event_date < datetime.utcnow() - timedelta(days=unused_days): unused_log_streams.append(log_info) except boto3.exceptions.Boto3Error as e: print(f"Error accessing log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}") except Exception as e: print(f"Unexpected error: {e}") return unused_log_streams def display_log_streams(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "Unused Log Streams Data" table.num_cols = 3 # Number of columns for Region, Log Group, and Log Stream table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names headers = ["Region", "Log Group", "Log Stream"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the log stream data by region for better readability data.sort(key=lambda x: x["region"]) # Populate the table with log stream data for row_num, entry in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each log stream entry values = [entry["region"], entry["log_group"], entry["log_stream"]] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main block # UNUSED_DAYS = 90 # all_log_streams to be passed down from parent task # Example structure, all_log_streams = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity all_log_streams = log_streams_data # Passed down from parent task unused_logs = filter_unused_log_streams(all_log_streams, UNUSED_DAYS) if unused_logs: display_log_streams(unused_logs) '''print("\nFiltered unused log streams:") for log in unused_logs: print(f"Region: {log['region']}, Log Group: {log['log_group']}, Log Stream: {log['log_stream']}")''' # Uncomment the line below if you want to see the full list of unused log streams # print(unused_logs) else: print("No Unused Logs") context.skip_sub_tasks=Truecopied3.6.2- 3.6.2.1r9VELqGVT9MERUkvcBZSDelete AWS CloudWatch Log Streams
3.6.2.1
Delete AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task allows users to remove specified log streams from AWS CloudWatch. By executing this task, organizations can effectively manage and declutter their logging space, ensuring that only relevant and necessary logs are retained. This not only optimizes the logging environment but also helps in potentially reducing storage-associated costs on AWS.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_log_streams(unused_logs): """ Deletes the specified CloudWatch log streams. Args: unused_logs (list): List of dictionaries containing region, log group, and unused log stream information. Returns: list: List of dictionaries with the results of the deletion process. """ deletion_results = [] for log_info in unused_logs: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: # Delete the log stream client.delete_log_stream( logGroupName=log_info['log_group'], logStreamName=log_info['log_stream'] ) deletion_results.append({ 'status': 'success', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Successfully deleted log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}." }) except boto3.exceptions.Boto3Error as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Error deleting log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}" }) except Exception as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Unexpected error: {e}" }) return deletion_results # Main Block # unused_logs to be passed down from parent task # Example Structure, unused_logs = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity results = delete_log_streams(unused_logs) if not results: print("No log streams were deleted.") else: for result in results: print(result['message'])copied3.6.2.1
- 3.7Ijrs2GV9Jsv1tn2CLArgDelete Unused AWS NAT Gateways
3.7
Delete Unused AWS NAT Gateways
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook identifies and removes inactive NAT gateways to optimize AWS costs. By eliminating unused resources, it streamlines infrastructure management and reduces unnecessary charges.
inputsoutputs3.7- 3.7.1cm5Hso01gVw4aRSFajjPList All AWS NAT Gateways
3.7.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves using the boto3 to programmatically iterate over all AWS regions, retrieve, and list details of all Network Address Translation (NAT) gateways present in an AWS account.
inputsoutputsimport boto3 from botocore.exceptions import ( BotoCoreError, ClientError, NoCredentialsError, PartialCredentialsError, EndpointConnectionError, ) creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_nat_gateways_for_region(ec2_client, region_name): nat_gateways_with_regions = [] try: response = ec2_client.describe_nat_gateways() if response and 'NatGateways' in response and len(response['NatGateways']) > 0: for nat_gateway in response['NatGateways']: nat_gateway_info = { "NatGatewayId": nat_gateway['NatGatewayId'], "Region": region_name, "State": nat_gateway['State'] } nat_gateways_with_regions.append(nat_gateway_info) #print(nat_gateway_info) else: #print(f"No NAT Gateways found in region {region_name}.") p=1 # Dummy line except (NoCredentialsError, PartialCredentialsError, EndpointConnectionError, ClientError, BotoCoreError, Exception) as e: print(f"Error in region {region_name}: {str(e)}") return nat_gateways_with_regions def display_nat_gateways(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "NAT Gateways Information" table.num_cols = 3 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = ["NAT Gateway ID", "Region", "State"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the NAT gateway data by region for better organization data.sort(key=lambda x: x["Region"]) # Populate the table with NAT gateway data for row_num, entry in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each NAT gateway entry values = [entry["NatGatewayId"], entry["Region"], entry["State"]] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) #region = 'us-east-1' # You can set this to None to check all regions region = None # Hardcoded for one time result all_nat_gateways = [] if region: ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) all_nat_gateways.extend(list_nat_gateways_for_region(ec2_client, region)) else: for region_name in regions: ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region_name) all_nat_gateways.extend(list_nat_gateways_for_region(ec2_client, region_name)) display_nat_gateways(all_nat_gateways)copied3.7.1 - 3.7.2OeEH3WuG9zDK2uUwnhkiFilter Out Unused AWS NAT Gateways
3.7.2
Filter Out Unused AWS NAT Gateways
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task identifies AWS NAT gateways that have not transferred any data in the past week or threshold, deeming them as "unused", and filters them out for potential optimization or deletion.
inputsoutputsimport boto3 from datetime import datetime, timedelta from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] unused_days = 7 # Hardcoded for One time Result def check_unused_nat_gateways_for_region(nat_gateways_list): unused_nat_gateways = [] # Check if the list is empty or not if not nat_gateways_list: print("No NAT gateways received for processing.") return unused_nat_gateways print(f"Received {len(nat_gateways_list)} NAT gateways for processing.") for nat_gateway_info in nat_gateways_list: region_name = nat_gateway_info['Region'] nat_gateway_id = nat_gateway_info['NatGatewayId'] ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) try: response = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'm1', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/NATGateway', 'MetricName': 'BytesOutToDestination', 'Dimensions': [ { 'Name': 'NatGatewayId', 'Value': nat_gateway_info['NatGatewayId'] } ] }, 'Period': 86400 * unused_days, 'Stat': 'Sum' }, 'ReturnData': True } ], StartTime=datetime.now() - timedelta(days=unused_days), EndTime=datetime.now() ) if not response['MetricDataResults'][0]['Values']: unused_nat_gateways.append(nat_gateway_info) except (ClientError, BotoCoreError, Exception) as e: print(f"Error in region {region_name} for NAT Gateway {nat_gateway_id}: {str(e)}") # Print the total number of unused NAT gateways print(f"Out of {len(nat_gateways_list)} NAT gateways, {len(unused_nat_gateways)} are unused.") return unused_nat_gateways ''' all_nat_gateways = [ {'NatGatewayId': 'nat-0bc09626aff12105a', 'Region': 'us-east-1', 'State': 'pending'}, {'NatGatewayId': 'nat-0cee3df0c034c58f8', 'Region': 'us-east-1', 'State': 'deleted'}, {'NatGatewayId': 'nat-0b5177c47df82bc51', 'Region': 'us-east-1', 'State': 'deleted'} ] # passed down from previous task ''' unused_nat_gateways = check_unused_nat_gateways_for_region(all_nat_gateways) context.skip_sub_tasks=Truecopied3.7.2- 3.7.2.1nzNVxfke9GGgKv1Fg687Delete AWS NAT Gateways
3.7.2.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task removes specified NAT gateways in an AWS environment. This cleanup optimizes network infrastructure, enhances security, and reduces costs by eliminating unused resources.
inputsoutputsimport boto3 from botocore.exceptions import (ClientError,BotoCoreError) creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_nat_gateways(nat_gateway_list): for nat_gateway_info in nat_gateway_list: region_name = nat_gateway_info['Region'] nat_gateway_id = nat_gateway_info['NatGatewayId'] nat_gateway_state = nat_gateway_info['State'] ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region_name) if nat_gateway_state == 'available': try: ec2_client.delete_nat_gateway(NatGatewayId=nat_gateway_id) print(f"Deleted NAT Gateway ID: {nat_gateway_id} in region {region_name}") except (ClientError, BotoCoreError, Exception) as e: print(f"Error deleting NAT Gateway {nat_gateway_id} in region {region_name}: {str(e)}") elif nat_gateway_state == 'pending': print(f"NAT Gateway ID: {nat_gateway_id} in region {region_name} is still in 'pending' state and cannot be deleted.") else: print(f"NAT Gateway ID: {nat_gateway_id} in region {region_name} is in '{nat_gateway_state}' state and was not deleted.") ''' unused_nat_gateways = [{'NatGatewayId': 'nat-0bc09626aff12105a', 'Region': 'us-east-1', 'State': 'available'}, {'NatGatewayId': 'nat-0cee3df0c034c58f8', 'Region': 'us-east-1', 'State': 'deleted'}, {'NatGatewayId': 'nat-0b5177c47df82bc51', 'Region': 'us-east-1', 'State': 'deleted'}] # passed down from previous task ''' if not unused_nat_gateways: print("No NAT gateways received for deletion.") else: delete_nat_gateways(unused_nat_gateways)copied3.7.2.1
- 3.8obmKfpHs23gAzJc92vvaStopping Underutilized AWS EC2 Instances
3.8
Stopping Underutilized AWS EC2 Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.In an AWS environment, EC2 instances incur charges based on their uptime. However, not all instances are actively utilized, leading to unnecessary expenses. Underutilized instances may have low CPU usage, minimal network activity, or other metrics indicating limited activity. Identifying and stopping such instances can result in significant cost savings. Tools like AWS Cost Explorer and third-party solutions can help identify these instances based on CloudWatch metrics. This runbook automates the process of monitoring and taking action on underutilized instances based on low CPU usage ensuring an optimized and cost-effective cloud environment. It's crucial, though, to ensure that stopping these instances won't disrupt essential services or applications.
inputsoutputsCPU_THRESHOLD = 20 # Hardcoded for one time result LOOKBACK_PERIOD_HOURS = 2 # Hardcoded for one time result region_name=Nonecopied3.8- 3.8.1l2WbsJzf9MvMxNsLlFq3Get all AWS EC2 instances
3.8.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Amazon Elastic Compute Cloud (EC2) is a service offered by Amazon Web Services (AWS) that provides resizable compute capacity in the cloud. Through Boto3's EC2 client, the describe_instances() method provides detailed information about each instance, including its ID, type, launch time, and current state. This capability assists users in effectively monitoring and managing their cloud resources.
inputsoutputsimport boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_regions(): ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name = 'us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def list_ec2_instances(region=None): # If no region is provided, fetch instances from all regions regions = [region] if region else list_all_regions() # Create an empty list to store instance details instance_details = [] for region in regions: # Try initializing the Boto3 EC2 client for the specific region try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except (NoCredentialsError, PartialCredentialsError): print(f"Failed for {region}: No AWS credentials found or incomplete credentials provided.") continue except BotoCoreError as e: print(f"Failed for {region}: Error initializing the EC2 client due to BotoCore Error: {e}") continue except Exception as e: print(f"Failed for {region}: Unexpected error initializing the EC2 client: {e}") continue #print(f"Fetching EC2 instance details for region: {region}...") # Try to paginate through the EC2 instance responses for the specific region try: paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page['Reservations']: for instance in reservation['Instances']: # Extract the desired attributes instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'] state = instance['State']['Name'] # Append the details to the list instance_details.append({ 'InstanceId': instance_id, 'InstanceType': instance_type, 'LaunchTime': launch_time, 'State': state, 'Region': region }) #print(f"Fetched all instance details for region: {region} successfully!") except ClientError as e: print(f"Failed for {region}: AWS Client Error while fetching EC2 instance details: {e}") except Exception as e: print(f"Failed for {region}: Unexpected error while fetching EC2 instance details: {e}") return instance_details def display_instance_details(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "EC2 Instance Details" table.num_cols = 5 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = ["Instance ID", "Instance Type", "Launch Time", "State", "Region"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the instance data by launch time for better organization data.sort(key=lambda x: x["LaunchTime"], reverse=True) # Populate the table with instance data for row_num, instance in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each instance values = [ instance["InstanceId"], instance["InstanceType"], instance["LaunchTime"].strftime('%Y-%m-%d %H:%M:%S'), # Format the datetime instance["State"], instance["Region"] ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # You can replace None with a specific region string like 'us-east-1' to get instances from a specific region # Hardcoded region_name for One time Execution Result region_name=None instances_list = list_ec2_instances(region_name) if instances_list: ''' print("\nEC2 Instance Details:") for instance in instances_list: print("-" * 50) # Separator line for key, value in instance.items(): print(f"{key}: {value}")''' display_instance_details(instances_list) else: print("No instances found or an error occurred.")copied3.8.1 - 3.8.2HeIQP6AhU6er8R7Tc0kdAverage CPU Utilizations of all AWS EC2 Instances
3.8.2
Average CPU Utilizations of all AWS EC2 Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputsimport boto3 from datetime import datetime, timedelta from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError, EndpointConnectionError, DataNotFoundError # AWS credentials creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('instances_list') is None: instances_list = [] # Function to fetch CPU utilization for a given instance def fetch_cpu_utilization(instance_id, region, start_time, end_time): try: cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) response = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}] }, 'Period': 3600, # one hour 'Stat': 'Average', }, 'ReturnData': True, }, ], StartTime=start_time, EndTime=end_time ) return response['MetricDataResults'][0]['Timestamps'], response['MetricDataResults'][0]['Values'] except Exception as e: print(f"Error getting CPU utilization for instance {instance_id}: {e}") return [], [] # Main plotting logic def plot_cpu_utilization(instances_list, lookback_days=7): end_time = datetime.utcnow() start_time = end_time - timedelta(days=lookback_days) for instance in instances_list: if instance['State'] != 'running': continue timestamps, cpu_values = fetch_cpu_utilization(instance['InstanceId'], instance['Region'], start_time, end_time) # Check if data is available if timestamps: context.plot.add_trace( name=f"Instance {instance['InstanceId']}", xpts=timestamps, # x-axis points ypts=cpu_values, # y-axis points tracetype="line" ) # Set plot properties context.plot.xlabel = 'Date' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'CPU Utilization per EC2 Instance (Last {lookback_days} Days)' # Execute the plotting function plot_cpu_utilization(instances_list)copied3.8.2 - 3.8.3QdM6M5g2g8ymYYUZwKBeIdentify Idle AWS EC2 Instances
3.8.3
Identify Idle AWS EC2 Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.AWS EC2 instances that are running but not actively used represent unnecessary costs. An "idle" EC2 instance typically exhibits very low metrics on parameters such as CPU utilization, network input/output, and disk read/writes. By leveraging AWS CloudWatch, users can monitor these metrics and identify instances that remain underutilized based on low CPU usage over extended periods. Once identified, these instances can either be stopped or terminated, leading to more efficient resource use and cost savings. It's important to analyze and verify the activity of these instances before taking action to ensure no critical processes are inadvertently affected.
inputsoutputsimport boto3 from datetime import datetime, timedelta from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError, EndpointConnectionError, DataNotFoundError # Constants for CPU threshold and lookback period # CPU_THRESHOLD = 5.0 # LOOKBACK_PERIOD_HOURS = 1 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('instances_list') is None: instances_list = [] def get_idle_instances(instances_list): idle_instances = [] end_time = datetime.utcnow() start_time = end_time - timedelta(hours=LOOKBACK_PERIOD_HOURS) for instance in instances_list: if instance['State'] != 'running': continue instance_id = instance['InstanceId'] region = instance['Region'] try: cloudwatch = boto3.client('cloudwatch',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) cpu_stats = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtil', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}] }, 'Period': 3600, # one hour periods 'Stat': 'Average' }, 'ReturnData': True } ], StartTime=start_time, EndTime=end_time ) avg_cpu_utilization = sum(cpu_stats['MetricDataResults'][0]['Values']) / len(cpu_stats['MetricDataResults'][0]['Values']) if cpu_stats['MetricDataResults'][0]['Values'] else 0.0 # Calculate idle hours based on the CPU threshold checks if avg_cpu_utilization < CPU_THRESHOLD: idle_hours = sum(1 for val in cpu_stats['MetricDataResults'][0]['Values'] if val < CPU_THRESHOLD) instance_info = instance.copy() instance_info['IdleHours'] = idle_hours idle_instances.append(instance_info) except Exception as e: print(f"Error processing instance {instance_id} in region {region}: {e}") return idle_instances def display_instance_details(data): table = context.newtable() table.title = "Idle EC2 Instances" table.num_cols = 6 # Updated number of columns table.num_rows = 1 table.has_header_row = True headers = ["Instance ID", "Instance Type", "Launch Time", "State", "Region", "Idle Hours"] for col_num, header in enumerate(headers): table.setval(0, col_num, header) data.sort(key=lambda x: x["LaunchTime"], reverse=True) for row_num, instance in enumerate(data, start=1): table.num_rows += 1 values = [ instance["InstanceId"], instance["InstanceType"], instance["LaunchTime"].strftime('%Y-%m-%d %H:%M:%S'), instance["State"], instance["Region"], str(instance["IdleHours"]) # Ensure the idle hours are converted to string ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main execution # Ensure to include your list_all_ec2_instances function or import it if it's in another module # instances_list = list_all_ec2_instances() Already taken from parent task idle_instances_list = get_idle_instances(instances_list) # Printing the details of idle instances if idle_instances_list: ''' print("\nIdle EC2 Instances:") for instance in idle_instances_list: print("-" * 60) # Separator line for key, value in instance.items(): print(f"{key}: {value}")''' display_instance_details(idle_instances_list) else: print("No idle instances found.") # Create a new list with only 'InstanceId' and 'Region' for each instance filtered_instances = [{'InstanceId': instance['InstanceId'], 'Region': instance['Region']} for instance in idle_instances_list] context.skip_sub_tasks=True ''' # Print the new list print("Printing instance_id and region wise instance list to check values for passing down to downstream task") for instance in filtered_instances: print(instance) '''copied3.8.3- 3.8.3.1V4iQY0Zrj0SaHEpJ5JmbStop an AWS EC2 Instance
3.8.3.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.In AWS, an EC2 instance can be in various states, including running, stopped, or terminated. Stopping an EC2 instance essentially means shutting it down, similar to turning off a computer. When an instance is stopped, it is not running, and therefore, you are not billed for instance usage. However, you are still billed for any EBS storage associated with the instance. The advantage of stopping, instead of terminating, is that you can start the instance again at any time. This capability is useful for scenarios where you want to temporarily halt operations without losing the instance configuration or data. It's essential to understand that stopping an instance will lead to the loss of the ephemeral storage content (Instance Store), but data on EBS volumes will remain intact.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('filtered_instances') is None: filtered_instances = [] def stop_ec2_instances(instances_to_stop): # To keep track of instances successfully stopped stopped_instances = [] # To keep track of instances that failed to stop failed_instances = [] # To keep track of instances that were already stopped or in the process of stopping already_stopped_instances = [] # Iterate over each instance in the list for instance_info in instances_to_stop: instance_id = instance_info['InstanceId'] region = instance_info['Region'] # Initialize the EC2 client for the specific region ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) # Fetch the current state of the instance try: response = ec2_client.describe_instances(InstanceIds=[instance_id]) instance_state = response['Reservations'][0]['Instances'][0]['State']['Name'] if instance_state in ["stopped", "stopping"]: already_stopped_instances.append(instance_id) print(f"Instance {instance_id} in region {region} is already in '{instance_state}' state.") continue # If the instance is not already stopped or stopping, then attempt to stop it ec2_client.stop_instances(InstanceIds=[instance_id]) stopped_instances.append(instance_id) print(f"Instance {instance_id} in region {region} has been stopped.") except ClientError as e: failed_instances.append(instance_id) print(f"Error with instance {instance_id} in region {region}: {e}") # Print a summary of the actions print("\nSummary:\n") if stopped_instances: print(f"Successfully stopped {len(stopped_instances)} instances: {', '.join(stopped_instances)}") if already_stopped_instances: print(f"{len(already_stopped_instances)} instances were already stopped or stopping: {', '.join(already_stopped_instances)}") if failed_instances: print(f"Failed to stop {len(failed_instances)} instances: {', '.join(failed_instances)}") ''' # Sample list of instances to stop taken from previous task or provide these instances to use the task in a standalone manner instances_to_stop = [ {'InstanceId': 'i-01615251421b8b5da', 'Region': 'us-east-1'}, {'InstanceId': 'i-057155192c87ea310', 'Region': 'us-east-1'} # ... (other instances) ] ''' stop_ec2_instances(filtered_instances) # passed down from previous task otherwise pass instances_to_stop to function to use the task in a standalone manner.copied3.8.3.1
- 3.9MfzuZI3DQDSNNvVAMzqJDelete Unattached AWS EBS(Elastic Block Store) Volumes
3.9
Delete Unattached AWS EBS(Elastic Block Store) Volumes
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook is designed to assist in the removal of unattached Amazon EBS Volumes within an AWS region. Once the EBS volume is deleted, the volume's data is permanently removed, and the volume cannot be attached to any instance. To preserve important data before deletion, you have the option to create a snapshot of the volume, allowing for potential volume recreation in the future.
inputsoutputs3.9- 3.9.1FA8kj7OlEvs61LlVGfKzGet Unattached AWS EBS Volumes
3.9.1
Get Unattached AWS EBS Volumes
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves identifying and filtering out Amazon Elastic Block Store (EBS) volumes that are not currently attached to any Amazon EC2 instances within a specific AWS region.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def filter_unattached_ebs_volumes(ec2_client): """ Filters unattached EBS volumes within a specific region. Args: ec2_client (boto3.client): An EC2 client instance. Returns: list: List of unattached EBS volume IDs. """ try: response = ec2_client.describe_volumes( Filters=[{"Name": "status", "Values": ["available"]}] ) unattached_volumes = [] for volume in response["Volumes"]: unattached_volumes.append(volume["VolumeId"]) return unattached_volumes except Exception as e: print(f"Error in filtering unattached volumes: {e}") return [] #regions = ["us-east-2"] # Add your desired regions here for region in regions: # Create an EC2 client instance for the region ec2_client = boto3.client("ec2", aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Get the list of unattached EBS volumes in the region unattached_volumes = filter_unattached_ebs_volumes(ec2_client) if not unattached_volumes: #print(f"No unattached EBS volumes found in region {region}") p=1 # dummy line to end the conditional block properly else: print(f"Unattached EBS volumes in region {region}: {unattached_volumes}") context.skip_sub_tasks=Truecopied3.9.1- 3.9.1.1KfMElX5rwD4kmfR7c6E2Create snapshots of unattached AWS EBS volumes
3.9.1.1
Create snapshots of unattached AWS EBS volumes
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task streamlines the process of capturing point-in-time backups of EBS volumes that are not currently attached to instances. By creating snapshots, you can ensure data durability and recovery options, enabling you to safeguard valuable data and simplify data restoration if needed.
inputsoutputsimport boto3 import datetime creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def create_ebs_snapshots(ec2_client, volume_id): """ Creates a snapshot of an EBS volume. Args: ec2_client (boto3.client): An EC2 client instance. volume_id (str): The ID of the EBS volume to create a snapshot of. """ try: response = ec2_client.create_snapshot(VolumeId=volume_id) snapshot_id = response["SnapshotId"] print(f"Snapshot {snapshot_id} created for volume {volume_id} at {datetime.datetime.now()}") except Exception as e: print(f"Error in creating snapshot for volume {volume_id}: {e}") #regions = ["us-east-2"] # Add your desired regions here for region in regions: # Create an EC2 client instance for the region ec2_client = boto3.client("ec2", aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) if not unattached_volumes: print(f"No unattached EBS volumes found in region {region}") else: print(f"Unattached EBS volumes in region {region}: {unattached_volumes}") for volume_id in unattached_volumes: create_ebs_snapshots(ec2_client, volume_id) context.proceed = Falsecopied3.9.1.1 - 3.9.1.2BbtEFqxPWhfdKbIqtUJLDelete Unattached EBS Volumes
3.9.1.2
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Efficiently manage your AWS Elastic Block Store (EBS) volumes by automating the deletion of unattached volumes. This task identifies EBS volumes that are not currently attached to any instances and removes them, helping you optimize storage resources and reduce unnecessary costs while maintaining your cloud infrastructure's cleanliness.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_ebs_volume(ec2_client, volume_id): """ Deletes an EBS volume. Args: ec2_client (boto3.client): An EC2 client instance. volume_id (str): The ID of the EBS volume to delete. """ try: ec2_client.delete_volume(VolumeId=volume_id) print(f"Volume {volume_id} deleted.") except Exception as e: print(f"Error in deleting volume {volume_id}: {e}") #regions = ["us-east-1"] # Add your desired regions here for region in regions: # Create an EC2 client instance for the region ec2_client = boto3.client("ec2", aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) if not unattached_volumes: print(f"No unattached EBS volumes found in region {region}") else: print(f"Unattached EBS volumes in region {region}: {unattached_volumes}") for volume_id in unattached_volumes: delete_ebs_volume(ec2_client, volume_id)copied3.9.1.2
- 3.10BnKg4NGiWNuUEsD8KFerDelete low usage AWS EBS Volumes
3.10
Delete low usage AWS EBS Volumes
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook identifies Amazon Elastic Block Storage (EBS) volumes that exhibit low usage, and subsequently removes them. The process involves searching for EBS volumes that have been minimally utilized over a specified threshold period, and then performing the deletion for those volumes. This automation helps optimize storage resources by removing underutilized volumes, freeing up space and potentially reducing costs.
inputsoutputs#low_usage_threshold = 90 # Harcoded for one time resultcopied3.10- 3.10.1UBHtfFjxRLdmgrHvdLL0Filter out low usage AWS EBS volumes
3.10.1
Filter out low usage AWS EBS volumes
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task aims to identify Amazon Elastic Block Storage (EBS) volumes with minimal usage. It involves scanning through AWS resources to pinpoint EBS volumes that have been scarcely utilized over a predefined threshold period. This process can be crucial for optimizing storage resources and identifying opportunities to reduce costs, as it helps identify volumes that may no longer be necessary due to low activity levels.
inputsoutputsimport boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Set the threshold for low usage in days #low_usage_threshold = 30 def get_all_regions(): """Retrieve all AWS regions.""" ec2 = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def find_low_usage_volumes(ec2_client, region): """ Find EBS volumes with low usage in a specific AWS region. Args: ec2_client (boto3.client): An EC2 client instance for a specific region. region (str): The AWS region of the volumes. Returns: list: List of dictionaries containing volume IDs and their region with low usage. """ low_usage_volumes = [] try: response = ec2_client.describe_volumes() for volume in response['Volumes']: volume_id = volume['VolumeId'] create_time = volume['CreateTime'] days_since_creation = (datetime.now() - create_time.replace(tzinfo=None)).days if days_since_creation >= low_usage_threshold: low_usage_volumes.append({ 'VolumeId': volume_id, 'Region': region, 'LowUsageDays': days_since_creation # Track how many days the volume has been in low usage }) return low_usage_volumes except Exception as e: print(f"Error in finding low usage volumes in region {region}: {e}") return [] def display_volume_usage(data): table = context.newtable() table.title = "Low Usage EBS Volumes" table.num_cols = 3 table.num_rows = 1 table.has_header_row = True headers = ["Volume ID", "Region", "Idle Days"] for col_num, header in enumerate(headers): table.setval(0, col_num, header) for region_volumes in data: for volume in region_volumes: table.num_rows += 1 values = [ volume["VolumeId"], volume["Region"], str(volume["LowUsageDays"]) # Convert idle days count to string for display ] for col_num, value in enumerate(values): table.setval(table.num_rows - 1, col_num, value) # region_name to be provided; if None, script runs for all regions # Hardcoded for one time result region_name = None # Set to a specific region, e.g., 'us-east-2', or None for all regions regions_to_process = [region_name] if region_name else get_all_regions() low_usage_volumes_info =[] for region in regions_to_process: ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) low_usage_volumes = find_low_usage_volumes(ec2_client, region) if not low_usage_volumes: #print(f"No low usage EBS volumes found in region {region}") p=1 # Dummy line to not give an empty conditional block else: ''' #print(f"Low usage EBS volumes in region {region}: {low_usage_volumes}") # Header for the output print(f"{'Volume ID':<20} {'Region':<10}") # Looping through each volume in the list for volume in low_usage_volumes: volume_id = volume['VolumeId'] region = volume['Region'] # Printing each volume's ID and region in a formatted manner print(f"{volume_id:<20} {region:<10}")''' low_usage_volumes_info.append(low_usage_volumes) display_volume_usage(low_usage_volumes_info) context.skip_sub_tasks=Truecopied3.10.1- 3.10.1.1EgBjFdEoJfI8cbObKO5fDelete Detached EBS Volumes having low usage
3.10.1.1
Delete Detached EBS Volumes having low usage
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves scanning through a list of EBS volumes and deleting them, provided they are not associated with any ec2 instances. Deleting these detached volumes that are no longer in use can help optimize storage resources and reduce unnecessary costs.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_detached_low_usage_volumes(volume_info): """ Delete detached low usage EBS volumes. Args: volume_info (dict): Dictionary containing the volume ID and its region. Returns: tuple: A tuple containing the count of deleted and skipped volumes. """ deleted_count, skipped_count = 0, 0 volume_id = volume_info['VolumeId'] region = volume_info['Region'] try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) volume = ec2_client.describe_volumes(VolumeIds=[volume_id])['Volumes'][0] if not volume['Attachments']: ec2_client.delete_volume(VolumeId=volume_id) deleted_count += 1 print(f"Deleted detached low usage EBS volume {volume_id} in region {region}") else: skipped_count += 1 print(f"Volume {volume_id} is attached to an EC2 instance. Skipping deletion in region {region}.") except Exception as e: print(f"Error in deleting volume {volume_id} in region {region}: {e}") return deleted_count, skipped_count # low_usage_volumes is a list of dictionaries received from the upstream task total_deleted, total_skipped = 0, 0 for volume_info in low_usage_volumes: deleted, skipped = delete_detached_low_usage_volumes(volume_info) total_deleted += deleted total_skipped += skipped print(f"Summary: {total_deleted} detached low usage EBS volumes were deleted.") print(f"{total_skipped} volumes were skipped (still attached).") if total_deleted == 0: print("No detached low usage EBS volumes were deleted.")copied3.10.1.1
- 3.11lQdIAo6Unx2nNndHohFfDelete AWS EBS Volumes attached to stopped EC2 instances
3.11
Delete AWS EBS Volumes attached to stopped EC2 instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook streamlines resource management by identifying and removing Elastic Block Store (EBS) volumes linked to stopped Amazon EC2 instances. Through a sequential process, this automation retrieves EBS volumes associated with stopped instances and subsequently detaches and deletes them. This procedure aids in resource optimization and cost efficiency within the AWS environment.
inputsoutputs3.11- 3.11.1GYMShAaObdOUSmMhS6srFilter out EBS Volumes attached to stopped EC2 instances
3.11.1
Filter out EBS Volumes attached to stopped EC2 instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This is a task focused on identifying Elastic Block Store (EBS) volumes that are connected to Amazon EC2 instances currently in a stopped state by examining the instance-state and corresponding volume mappings.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_stopped_instance_volumes(ec2_client): """ Gets EBS volumes attached to stopped instances. Args: ec2_client (boto3.client): An EC2 client instance. Returns: dict: Dictionary with instance IDs as keys and associated volume IDs as values. """ instance_volume_map = {} try: instances = ec2_client.describe_instances(Filters=[{"Name": "instance-state-name", "Values": ["stopped"]}]) for reservation in instances["Reservations"]: for instance in reservation["Instances"]: instance_id = instance["InstanceId"] volumes = instance.get("BlockDeviceMappings", []) volume_ids = [volume["Ebs"]["VolumeId"] for volume in volumes] instance_volume_map[instance_id] = volume_ids return instance_volume_map except Exception as e: print(f"Error in getting instance volumes: {e}") return {} #regions = ["us-east-1"] # Add your desired regions here for region in regions: try: # Create an EC2 client for the specified region ec2_client = boto3.client("ec2", aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Get the dictionary of stopped instance volumes instance_volume_map = get_stopped_instance_volumes(ec2_client) if not instance_volume_map: #print(f"No stopped instances with attached volumes found in region {region}") p=1 # Dummy line to end conditional block correctly else: print(f"Stopped instance volumes in region {region}:\n{instance_volume_map}") except Exception as e: print(f"Error in region {region}: {e}") context.skip_sub_tasks=Truecopied3.11.1- 3.11.1.1ZN1jsWHIgLRDLexfM06TDetach and Delete AWS EBS Volumes
3.11.1.1
Detach and Delete AWS EBS Volumes
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves the removal of Elastic Block Store (EBS) volumes from their associated instances followed by the deletion of these volumes. In this particular task detachment and deletion of EBS volumes is related to stopped EC2 instances. It is executed to free up storage resources, enhance resource allocation efficiency, and optimize costs within the AWS infrastructure.
inputsoutputsimport boto3 import time creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def detach_and_delete_ebs_volumes(ec2_client, instance_volume_map): """ Detaches and deletes EBS volumes attached to instances. Args: ec2_client (boto3.client): An EC2 client instance. instance_volume_map (dict): Dictionary with instance IDs as keys and associated volume IDs as values. """ try: for instance_id, volume_ids in instance_volume_map.items(): for volume_id in volume_ids: ec2_client.detach_volume(InstanceId=instance_id, VolumeId=volume_id, Force=True) print(f"Detached EBS volume {volume_id} from instance {instance_id}") time.sleep(5) # Wait for a few seconds to ensure detachment is complete otherwise there is a VolumeInUse error try: ec2_client.delete_volume(VolumeId=volume_id) print(f"Deleted EBS volume {volume_id}") except Exception as e: print(f"Error in deleting EBS volume {volume_id}: {e}") except Exception as e: print(f"Error in detaching and deleting EBS volumes: {e}") #regions = ["us-east-1"] # Add your desired regions here for region in regions: # Create an EC2 client instance for the region ec2_client = boto3.client("ec2", aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) if not instance_volume_map: print(f"No volumes attached to stopped instances found in region {region}") else: # Detach and delete the identified EBS volumes detach_and_delete_ebs_volumes(ec2_client, instance_volume_map)copied3.11.1.1
- 3.12mgc791zWLyRgPRGzROnzFilter Out and Delete Old AWS EBS Snapshots
3.12
Filter Out and Delete Old AWS EBS Snapshots
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook identifies and removes old Amazon Elastic Block Store (EBS) snapshots. By setting a specific age threshold it scans through designated AWS regions, pinpoints snapshots that surpass the age limit, and subsequently deletes them. This operation not only ensures efficient resource utilization but also aids in minimizing storage costs, promoting a cleaner and more cost-effective cloud environment.
inputsoutputsdays_old=60 #Hardcoded for one time resultcopied3.12- 3.12.1cE2VyVuRge1jnahfJtVuFilter Out Old AWS EBS Snapshots
3.12.1
Filter Out Old AWS EBS Snapshots
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task identifies old Amazon Elastic Block Store (EBS) snapshots. By setting an age threshold, it scans across specified AWS regions, highlighting snapshots that exceed the set duration. This facilitates better management, paving the way for timely deletions and efficient storage utilization.
inputsoutputsimport boto3 from botocore.exceptions import ClientError from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def find_old_snapshots(ec2_client, days_old, region): """ Find EBS snapshots in a specified AWS region that are older than a given number of days. Args: ec2_client (boto3.client): Boto3 EC2 client object. days_old (int): The age in days to consider an EBS snapshot as old. region (str): The AWS region to search for old snapshots. Returns: list[str]: List of old snapshot IDs. Returns None if there's an error. """ old_snapshots = [] # Initialize an empty list to store the IDs of old snapshots try: # Fetch all snapshots owned by the current AWS account snapshots = ec2_client.describe_snapshots(OwnerIds=['self'])['Snapshots'] # Calculate the cutoff date for old snapshots, removing timezone information to make it "naive" cutoff_date = datetime.now().replace(tzinfo=None) - timedelta(days=days_old) # Loop through each snapshot to check its age for snapshot in snapshots: # Remove timezone information from the snapshot's start time to make it "naive" snapshot_time_naive = snapshot['StartTime'].replace(tzinfo=None) # Compare snapshot's start time with the cutoff date if snapshot_time_naive < cutoff_date: old_snapshots.append(snapshot['SnapshotId']) # Append old snapshot IDs to the list return old_snapshots # Return the list of old snapshot IDs except ClientError as e: print(f"A ClientError occurred in region {region}: {e}") # Handle any ClientErrors return None except Exception as e: print(f"An unknown error occurred in region {region}: {e}") # Handle any general exceptions return None # List of AWS regions to check for old snapshots #regions_to_check = ['us-east-1', 'us-east-2'] #, 'us-west-2'] # Age in days to consider an EBS snapshot as old #days_old = 5 # Initialize an empty dictionary to store the snapshot IDs by region snapshots_by_region = {} # Initialize a list to store regions where no old snapshots were found regions_without_snapshots = [] # Loop through each AWS region to find old snapshots for region in regions: #print(f"Checking region {region}...") ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) # Initialize EC2 client for the region old_snapshots = find_old_snapshots(ec2_client, int(days_old), region) # Find old snapshots in the region # If old snapshots are found, add them to the dictionary if old_snapshots: snapshots_by_region[region] = old_snapshots else: regions_without_snapshots.append(region) # Print the resulting dictionary print("\nSummary of old snapshots by region:") for region, snapshot_ids in snapshots_by_region.items(): print(f"{region}: {snapshot_ids}") # Print regions without old snapshots if regions_without_snapshots: print(f"\nNo old snapshots found in the following regions: {', '.join(regions_without_snapshots)}") context.skip_sub_tasks=Truecopied3.12.1- 3.12.1.1lfKDvBDnuorGNlk0HPYZDelete AWS EBS Snapshots
3.12.1.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task removes specified Amazon Elastic Block Store (EBS) snapshots. Designed to streamline storage management, this procedure efficiently purges selected snapshots across designated AWS regions, ensuring optimal resource utilization and reducing unnecessary storage costs.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Function to delete a list of EBS snapshots given their IDs def delete_snapshots(ec2_client, snapshot_ids, region): """ Delete a list of specified EBS snapshots in a given AWS region. Args: ec2_client (boto3.client): Boto3 EC2 client object. snapshot_ids (list[str]): List of EBS snapshot IDs to be deleted. region (str): The AWS region where the snapshots are located. Returns: None: This function does not return any value. """ for snapshot_id in snapshot_ids: try: # Delete the snapshot ec2_client.delete_snapshot(SnapshotId=snapshot_id) print(f"Deleted snapshot {snapshot_id} in region {region}") # Confirm deletion except ClientError as e: print(f"Could not delete snapshot {snapshot_id} in region {region}: {e}") # Handle any ClientErrors ''' #Example structure of snapshots_by_region # Dictionary mapping AWS regions to their respective old snapshots snapshots_by_region = { 'us-east-1': ['snap-04cbc2182c8f5e1ed', 'snap-0004bbdd1e7b0d35c'], 'us-west-2': [] # Just as an example, no snapshots listed for us-west-2 } ''' # Loop through each AWS region in the dictionary to delete old snapshots for region, old_snapshots in snapshots_by_region.items(): print(f"Checking region {region}...") ec2_client = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) # Initialize EC2 client for the region # Delete old snapshots if any are found for the current region if old_snapshots: print(f"Found {len(old_snapshots)} old snapshots in {region}. Deleting them...") delete_snapshots(ec2_client, old_snapshots, region) else: print(f"No old snapshots found in {region}.") # Confirm if no old snapshots are found in the current regioncopied3.12.1.1
- 3.13raeHNdASoNPLDTe3cbxDGet unhealthy resources attached to AWS ELBs(Elastic Load Balancers)
3.13
Get unhealthy resources attached to AWS ELBs(Elastic Load Balancers)
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook retrieves the list of resources, such as instances or targets, that are marked as 'unhealthy' or 'OutOfService', and are associated with AWS Elastic Load Balancers (ELB). This helps in identifying potential issues and ensuring the smooth operation and high availability of applications.
inputsoutputs3.13- 3.13.1RKCYZEpNmGgj6MW1811qGet Unhealthy instances attached to Classic Load Balancers
3.13.1
Get Unhealthy instances attached to Classic Load Balancers
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task checks for instances which are OutOfService and are associated with a Classic Load Balancer.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_unhealthy_instances(regions,elb_name=None): """ Fetch instances that are in "OutOfService" state for AWS Elastic Load Balancers (ELBs). Parameters: - elb_name (str, optional): Specific name of the Elastic Load Balancer to check. Default is None, which checks all ELBs. - regions (list): List of AWS regions to check. Returns: - list: A list of dictionaries containing details of unhealthy instances. """ result = [] # Loop through each specified region to check the health of instances under ELBs for reg in regions: try: # Initialize ELB client for the specified region elb_client = boto3.client('elb', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=reg) # Get a list of all load balancers in the current region elbs = elb_client.describe_load_balancers()["LoadBalancerDescriptions"] # Loop through each ELB to check the health of its instances for elb in elbs: # If a specific elb_name is provided, then skip the ELBs that don't match the name if elb_name and elb["LoadBalancerName"] != elb_name: continue # Fetch the health status of instances attached to the current ELB res = elb_client.describe_instance_health(LoadBalancerName=elb["LoadBalancerName"]) # Check each instance's health status for instance in res['InstanceStates']: # If the instance is "OutOfService", add its details to the result list if instance['State'] == "OutOfService": data_dict = { "instance_id": instance["InstanceId"], "region": reg, "load_balancer_name": elb["LoadBalancerName"] } result.append(data_dict) # Handle specific ClientError exceptions (e.g. permission issues, request limits) except ClientError as e: print(f"ClientError in region {reg}: {e}") # Handle general exceptions except Exception as e: print(f"An error occurred in region {reg}: {e}") return result # Specify the AWS regions to check for unhealthy instances #regions_to_check = ['us-east-1', 'us-west-2'] # Fetch the list of unhealthy instances unhealthy_instances = get_unhealthy_instances(regions) # Print the details of unhealthy instances, if any if unhealthy_instances: print("Unhealthy instances detected:") for instance in unhealthy_instances: print(f"Region: {instance['region']}, LoadBalancer: {instance['load_balancer_name']}, InstanceID: {instance['instance_id']}") else: print("No unhealthy instances found.")copied3.13.1 - 3.13.2oEDiBh3NgMnMbUV0r5r0Get Unhealthy targets associated to an ALB or NLB
3.13.2
Get Unhealthy targets associated to an ALB or NLB
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task retrieves and lists targets that are marked as 'unhealthy' and linked to AWS Application Load Balancers (ALB) or Network Load Balancers (NLB). This process helps in detecting non-performing targets to maintain optimal load distribution and service availability.
inputsoutputsimport boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_unhealthy_targets(regions, elb_arn=None): """ Fetch targets (instances) that are in "unhealthy" state for AWS Application Load Balancers (ALBs) and Network Load Balancers (NLBs). Parameters: - elb_arn (str, optional): Specific ARN of the Elastic Load Balancer to check. Default is None, which checks all ELBs. - regions (list): List of AWS regions to check. Returns: - list: A list of dictionaries containing details of unhealthy targets. """ # Initialize an empty list to store results result = [] # Loop through each specified region to check for unhealthy targets for reg in regions: try: # Create a new client for the ELBv2 service in the specified region elbv2_client = boto3.client('elbv2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=reg) # Retrieve the list of all ALBs and NLBs in the current region elbs = elbv2_client.describe_load_balancers()["LoadBalancers"] # Loop through each Load Balancer and inspect its targets for elb in elbs: # If a specific ELB ARN is provided, skip all other load balancers if elb_arn and elb["LoadBalancerArn"] != elb_arn: continue # Get all target groups associated with the current load balancer target_groups = elbv2_client.describe_target_groups(LoadBalancerArn=elb["LoadBalancerArn"])["TargetGroups"] # Check the health status of each target within the target group for tg in target_groups: health_descriptions = elbv2_client.describe_target_health(TargetGroupArn=tg["TargetGroupArn"])["TargetHealthDescriptions"] # If a target is found to be "unhealthy", store its details in the result list for desc in health_descriptions: if desc["TargetHealth"]["State"] == "unhealthy": data_dict = { "target_id": desc["Target"]["Id"], "region": reg, "load_balancer_arn": elb["LoadBalancerArn"], "target_group_arn": tg["TargetGroupArn"] } result.append(data_dict) # Catch any AWS-related exceptions and print an error message except ClientError as e: print(f"ClientError in region {reg}: {e}") # Catch any other general exceptions and print an error message except Exception as e: print(f"An error occurred in region {reg}: {e}") return result # Specify the AWS regions to check for unhealthy targets #regions_to_check = ['us-east-1', 'us-west-2'] # Retrieve and print the details of any found unhealthy targets unhealthy_targets = get_unhealthy_targets(regions) if unhealthy_targets: print("Unhealthy targets detected:") for target in unhealthy_targets: print(f"Region: {target['region']}\nLoadBalancer ARN: {target['load_balancer_arn']}\nTargetGroup ARN: {target['target_group_arn']}\nTarget ID: {target['target_id']}\n") else: print("No unhealthy targets found.")copied3.13.2
- 3.14jn3yQRa6UfiTPCS9eTN8Delete AWS ELBs with no targets or instances
3.14
Delete AWS ELBs with no targets or instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook helps in identifying and removing Amazon Elastic Load Balancers (ELBs) that do not have any associated target groups or instances. ELBs play a crucial role in distributing traffic across instances, and if they are no longer serving a purpose due to the absence of targets or instances, it's recommended to remove them to optimize resources and reduce unnecessary costs. This process involves identifying such ELBs across specified AWS regions, displaying their details, and then, if applicable, deleting them to maintain an efficient and streamlined AWS environment.
inputsoutputs3.14- 3.14.1ftprxh1hQkCCSd4vzUGAFind AWS ELBs with No Targets or Instances
3.14.1
Find AWS ELBs with No Targets or Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task identifies AWS Elastic Load Balancers (ELBs) that have no associated targets or instances. Such ELBs may indicate unused resources, leading to unnecessary costs. Checking and managing these can optimize AWS expenses.
inputs