Comprehensive AWS Cost Analysis Breakdown
This task provides a comprehensive analysis of AWS expenses through five key modules. It categorizes costs by AWS services, identifies primary cost drivers and more. The analysis charts expenses over time and breaks down costs by AWS account IDs to pinpoint major spenders aiding in efficient financial forecasting and planning.
- 1fSFO8HDNIY2YN1QGMiFkDaily AWS Costs using Athena
1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Analyze daily AWS spending by querying the CUR data stored in S3 with Athena, providing insights into overall cost trends and spikes.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs, last_n_days): x = dates # Dates are now strings y = costs # Corresponding daily costs print("x values (dates):", x) print("y values (costs):", y) context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily AWS Costs (Last {last_n_days} Days)' context.plot.add_trace(name=f'Daily AWS Costs (Last {last_n_days} Days)', xpts=x, ypts=y, tracetype="lines") # last_n_days = 7 # Passes as an input parameter query = f""" SELECT DATE(line_item_usage_start_date) AS usage_date, SUM(line_item_unblended_cost) AS total_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY DATE(line_item_usage_start_date) ORDER BY usage_date; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [row[0] for row in results] costs = [float(row[1]) for row in results] print("x values (dates):", dates) print("y values (costs):", costs) visualize_data(dates, costs, last_n_days) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.")copied1 - 2RG7Fx5TcHNgGVXL1VgsqDaily AWS Costs by service using Athena
2
Daily AWS Costs by service using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Break down daily AWS expenses by service to pinpoint where spending is occurring, using Athena to query detailed CUR data.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs_by_service, last_n_days): print("x values (dates):", dates) for service, costs in costs_by_service.items(): print(f"y values (costs) for {service}:", costs) context.plot.add_trace(name=f'{service} (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines") context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily AWS Costs by service (Last {last_n_days} Days excluding last 2 days)' def main(last_n_days, top_n_services): query = f""" WITH service_costs AS ( SELECT DATE(line_item_usage_start_date) AS usage_date, line_item_product_code AS service, SUM(line_item_unblended_cost) AS daily_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY DATE(line_item_usage_start_date), line_item_product_code ), top_services AS ( SELECT service FROM service_costs GROUP BY service ORDER BY SUM(daily_cost) DESC LIMIT {top_n_services} ) SELECT sc.usage_date, sc.service, sc.daily_cost FROM service_costs sc JOIN top_services ts ON sc.service = ts.service ORDER BY sc.usage_date, sc.service; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [] costs_by_service = {} for row in results: date, service, cost = row if date not in dates: dates.append(date) if service not in costs_by_service: costs_by_service[service] = [] costs_by_service[service].append(float(cost)) visualize_data(dates, costs_by_service, last_n_days) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.") # last_n_days = 7 # To be passed as an input parameter # top_n_services = 5 # To be passed as an input parameter main(last_n_days, top_n_services)copied2 - 3Q6uA91jnIEnSZHF6jzFPEC2 Cost Analysis
3
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputs3- 3.1dBIssYjV0DdxoufdUtePDaily AWS Costs by top_n_instances using athena
3.1
Daily AWS Costs by top_n_instances using athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Focus on the highest cost-generating instances, querying CUR data with Athena to identify and analyze the top spending instances daily.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs_by_instance, last_n_days, top_n_instances): print("x values (dates):", dates) for instance, costs in costs_by_instance.items(): print(f"y values (costs) for {instance}:", costs) context.plot.add_trace(name=f'{instance} (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines") context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Top {top_n_instances} EC2 Instances Daily Costs (Last {last_n_days} Days excluding last 2 days)' def main(last_n_days, top_n_instances): query = f""" WITH ranked_instances AS ( SELECT DATE(line_item_usage_start_date) AS usage_date, line_item_resource_id AS instance_id, line_item_product_code AS product_code, product_instance_type AS instance_type, SUM(line_item_unblended_cost) AS daily_cost, RANK() OVER (PARTITION BY DATE(line_item_usage_start_date) ORDER BY SUM(line_item_unblended_cost) DESC) AS rank FROM my_cur_report_athena WHERE line_item_product_code = 'AmazonEC2' AND line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) AND product_instance_type IS NOT NULL AND product_instance_type != '' GROUP BY DATE(line_item_usage_start_date), line_item_resource_id, line_item_product_code, product_instance_type ) SELECT usage_date, instance_id, product_code, instance_type, daily_cost FROM ranked_instances WHERE rank <= {top_n_instances} ORDER BY usage_date, daily_cost DESC; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [] costs_by_instance = {} all_dates = set(row[0] for row in results) for row in results: date, instance_id, product_code, instance_type, cost = row if date not in dates: dates.append(date) instance_key = f"{instance_type} ({instance_id})" if instance_key not in costs_by_instance: costs_by_instance[instance_key] = {d: 0 for d in all_dates} costs_by_instance[instance_key][date] = float(cost) # Convert costs_by_instance to lists for plotting final_costs_by_instance = {k: [v[d] for d in dates] for k, v in costs_by_instance.items()} visualize_data(dates, final_costs_by_instance, last_n_days, top_n_instances) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.") # last_n_days = 7 # To be passed as an input parameter # top_n_instances = 3 # To be passed as an input parameter main(last_n_days, top_n_instances)copied3.1 - 3.2YH2n1kJrTYWgmtIQyBr8AWS EC2 Usage Analysis
3.2
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook involves collecting data on EC2 instances, retrieving CPU utilization metrics from Amazon CloudWatch, and visually plotting this data to identify underutilized or overutilized instances. This task helps in recognizing potential cost-saving opportunities by rightsizing instances, either by downsizing underutilized instances to reduce costs or upsizing overutilized instances to improve performance.
inputsoutputsregion_name = None region_name_to_search_recommendations = Nonecopied3.2- 3.2.1l2WbsJzf9MvMxNsLlFq3Get all AWS EC2 instances
3.2.1
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Amazon Elastic Compute Cloud (EC2) is a service offered by Amazon Web Services (AWS) that provides resizable compute capacity in the cloud. Through Boto3's EC2 client, the describe_instances() method provides detailed information about each instance, including its ID, type, launch time, and current state. This capability assists users in effectively monitoring and managing their cloud resources.
inputsoutputsimport boto3 from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def list_all_regions(): ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name = 'us-east-1') return [region['RegionName'] for region in ec2.describe_regions()['Regions']] def list_ec2_instances(region=None): # If no region is provided, fetch instances from all regions regions = [region] if region else list_all_regions() # Create an empty list to store instance details instance_details = [] for region in regions: # Try initializing the Boto3 EC2 client for the specific region try: ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=region) except (NoCredentialsError, PartialCredentialsError): print(f"Failed for {region}: No AWS credentials found or incomplete credentials provided.") continue except BotoCoreError as e: print(f"Failed for {region}: Error initializing the EC2 client due to BotoCore Error: {e}") continue except Exception as e: print(f"Failed for {region}: Unexpected error initializing the EC2 client: {e}") continue #print(f"Fetching EC2 instance details for region: {region}...") # Try to paginate through the EC2 instance responses for the specific region try: paginator = ec2_client.get_paginator('describe_instances') for page in paginator.paginate(): for reservation in page['Reservations']: for instance in reservation['Instances']: # Extract the desired attributes instance_id = instance['InstanceId'] instance_type = instance['InstanceType'] launch_time = instance['LaunchTime'] state = instance['State']['Name'] # Append the details to the list instance_details.append({ 'InstanceId': instance_id, 'InstanceType': instance_type, 'LaunchTime': launch_time, 'State': state, 'Region': region }) #print(f"Fetched all instance details for region: {region} successfully!") except ClientError as e: print(f"Failed for {region}: AWS Client Error while fetching EC2 instance details: {e}") except Exception as e: print(f"Failed for {region}: Unexpected error while fetching EC2 instance details: {e}") return instance_details def display_instance_details(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "EC2 Instance Details" table.num_cols = 5 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = ["Instance ID", "Instance Type", "Launch Time", "State", "Region"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the instance data by launch time for better organization data.sort(key=lambda x: x["LaunchTime"], reverse=True) # Populate the table with instance data for row_num, instance in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each instance values = [ instance["InstanceId"], instance["InstanceType"], instance["LaunchTime"].strftime('%Y-%m-%d %H:%M:%S'), # Format the datetime instance["State"], instance["Region"] ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # You can replace None with a specific region string like 'us-east-1' to get instances from a specific region # Hardcoded region_name for One time Execution Result region_name=None instances_list = list_ec2_instances(region_name) if instances_list: ''' print("\nEC2 Instance Details:") for instance in instances_list: print("-" * 50) # Separator line for key, value in instance.items(): print(f"{key}: {value}")''' display_instance_details(instances_list) else: print("No instances found or an error occurred.")copied3.2.1 - 3.2.2ez5YMQ9CcFXNzZMMfBE9Aggregate and Visualize Comprehensive EC2 CPU Utilization
3.2.2
Aggregate and Visualize Comprehensive EC2 CPU Utilization
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task aggregates CPU utilization data for running EC2 instances across an AWS account, computes the average CPU usage over a specified period, and plots the average to help in assessing overall resource efficiency.
inputsoutputsimport boto3 from datetime import datetime, timedelta last_n_days=30 # AWS Credentials creds = _get_creds(cred_label)['creds'] # Placeholder function to get AWS credentials access_key = creds['username'] secret_key = creds['password'] '''# Placeholder for instances_list instances_list = [ {'InstanceId': 'instance1', 'Region': 'us-east-1', 'State': 'running'}, {'InstanceId': 'instance2', 'Region': 'us-east-1', 'State': 'running'}, # Add more instances as needed ] ''' def fetch_cpu_utilization(instance_id, region, start_time, end_time): cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) metrics = cloudwatch.get_metric_statistics( Namespace='AWS/EC2', MetricName='CPUUtilization', Dimensions=[{'Name': 'InstanceId', 'Value': instance_id}], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Average'] ) # Calculate average CPU utilization without NumPy data_points = metrics.get('Datapoints', []) if data_points: avg_cpu = sum(dp['Average'] for dp in data_points) / len(data_points) else: avg_cpu = 0 return avg_cpu def plot_cpu_utilization(instances_list, last_n_days=7): start_time = datetime.utcnow() - timedelta(days=last_n_days) end_time = datetime.utcnow() avg_utilizations = [] for instance in instances_list: if instance['State'] == 'running': avg_cpu = fetch_cpu_utilization(instance['InstanceId'], instance['Region'], start_time, end_time) avg_utilizations.append((instance['InstanceId'], avg_cpu)) # Sort instances by average CPU utilization and select top 3 and bottom 3 avg_utilizations.sort(key=lambda x: x[1], reverse=True) top_instances = avg_utilizations[:3] bottom_instances = avg_utilizations[-3:] # Prepare data for plotting instance_ids = [x[0] for x in top_instances + bottom_instances] utilizations = [x[1] for x in top_instances + bottom_instances] # Plotting context.plot.add_trace( name="CPU Utilization", xpts=instance_ids, ypts=utilizations, tracetype='bar' ) context.plot.xlabel = 'Instance ID' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'Top & Bottom 3 EC2 Instances by CPU Utilization (Last {last_n_days} Days)' plot_cpu_utilization(instances_list, last_n_days=30)copied3.2.2 - 3.2.3FNMZWFoMC9d9KCxXZKjuPlot Average CPU Utilization for all running AWS EC2 Instances
3.2.3
Plot Average CPU Utilization for all running AWS EC2 Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task entails collecting CPU usage metrics from Amazon CloudWatch, calculating the average utilization, and visualizing this data. This task aids in identifying underutilized or overutilized instances, facilitating efficient resource management and cost optimization in AWS.
inputsoutputsimport boto3 from datetime import datetime, timedelta from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError, EndpointConnectionError, DataNotFoundError last_n_days=30 # AWS credentials creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('instances_list') is None: instances_list = [] # Function to fetch CPU utilization for a given instance def fetch_cpu_utilization(instance_id, region, start_time, end_time): try: cloudwatch = boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) response = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtilization', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}] }, 'Period': 3600, # one hour 'Stat': 'Average', }, 'ReturnData': True, }, ], StartTime=start_time, EndTime=end_time ) return response['MetricDataResults'][0]['Timestamps'], response['MetricDataResults'][0]['Values'] except Exception as e: print(f"Error getting CPU utilization for instance {instance_id}: {e}") return [], [] # Main plotting logic def plot_cpu_utilization(instances_list, lookback_days=last_n_days): end_time = datetime.utcnow() start_time = end_time - timedelta(days=lookback_days) # Filter running EC2 instances for instance in instances_list: if instance['State'] != 'running': continue timestamps, cpu_values = fetch_cpu_utilization(instance['InstanceId'], instance['Region'], start_time, end_time) # Check if data is available if timestamps: context.plot.add_trace( name=f"Instance {instance['InstanceId']}", xpts=timestamps, # x-axis points ypts=cpu_values, # y-axis points tracetype="line" ) # Set plot properties context.plot.xlabel = 'Date' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'CPU Utilization per EC2 Instance (Last {last_n_days} Days)' # Execute the plotting function plot_cpu_utilization(instances_list)copied3.2.3
- 3.3fgHYQK6ky4IDPP33V0itRightsizing Recommendations for AWS EC2 Instances using AWS Compute Optimizer
3.3
Rightsizing Recommendations for AWS EC2 Instances using AWS Compute Optimizer
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task utilizes AWS Compute Optimizer to fetch rightsizing recommendations for AWS EC2 instances, aiming to optimize instance sizes based on actual usage. It assesses whether instances are under-utilized or over-utilized, suggesting adjustments to enhance performance and reduce costs. By querying across specified or all regions, it allows for a comprehensive optimization strategy, ensuring resources are efficiently allocated and maximizes cost-effectiveness and performance across your AWS environment.
inputsoutputsregion_name_to_search_recommendations = Nonecopied3.3- 3.3.1O5FtOq5CNw0pQ0RzZs9iFetch Rightsizing Recommendations for AWS EC2 Instances
3.3.1
Fetch Rightsizing Recommendations for AWS EC2 Instances
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task retrieves AWS EC2 instance rightsizing recommendations using AWS Compute Optimizer, identifying cost-saving and performance-enhancing opportunities by analyzing usage patterns. It suggests optimal instance types or sizes, ensuring efficient resource utilization.
inputsoutputsimport json import boto3 from botocore.exceptions import BotoCoreError, ClientError from datetime import datetime creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Initialize boto3 clients #compute_optimizer_client = boto3.client('compute-optimizer', region_name='us-west-2') pricing_client = boto3.client('pricing',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name='us-east-1') def datetime_converter(o): if isinstance(o, datetime): return o.__str__() def get_price_for_instance(instance_type, region): # Mapping AWS region to the Pricing API format region_name_map = { "us-east-1": "US East (N. Virginia)", "us-east-2": "US East (Ohio)", "us-west-1": "US West (N. California)", "us-west-2": "US West (Oregon)", "af-south-1": "Africa (Cape Town)", "ap-east-1": "Asia Pacific (Hong Kong)", "ap-south-1": "Asia Pacific (Mumbai)", "ap-northeast-3": "Asia Pacific (Osaka)", "ap-northeast-2": "Asia Pacific (Seoul)", "ap-southeast-1": "Asia Pacific (Singapore)", "ap-southeast-2": "Asia Pacific (Sydney)", "ap-northeast-1": "Asia Pacific (Tokyo)", "ca-central-1": "Canada (Central)", "eu-central-1": "EU (Frankfurt)", "eu-west-1": "EU (Ireland)", "eu-west-2": "EU (London)", "eu-south-1": "EU (Milan)", "eu-west-3": "EU (Paris)", "eu-north-1": "EU (Stockholm)", "me-south-1": "Middle East (Bahrain)", "sa-east-1": "South America (São Paulo)"} region_name = region_name_map.get(region, region) # Default to using the region code if no mapping found try: response = pricing_client.get_products( ServiceCode='AmazonEC2', Filters=[ {'Type': 'TERM_MATCH', 'Field': 'instanceType', 'Value': instance_type}, {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': region_name}, {'Type': 'TERM_MATCH', 'Field': 'preInstalledSw', 'Value': 'NA'}, {'Type': 'TERM_MATCH', 'Field': 'operatingSystem', 'Value': 'Linux'}, {'Type': 'TERM_MATCH', 'Field': 'tenancy', 'Value': 'shared'}, {'Type': 'TERM_MATCH', 'Field': 'capacitystatus', 'Value': 'Used'}, ], MaxResults=1 ) price_info = json.loads(response['PriceList'][0]) price_dimensions = next(iter(price_info['terms']['OnDemand'].values()))['priceDimensions'] price_per_unit = next(iter(price_dimensions.values()))['pricePerUnit']['USD'] return float(price_per_unit) except Exception as e: print(f"Error fetching price for {instance_type} in {region}: {e}") return None def display_instance_recommendations(recommendations): # Initialize table with the desired structure and headers table = context.newtable() table.title = "EC2 Instance Rightsizing Recommendations" table.num_cols = 10 # Adjust the number of columns to match the number of attributes you want to display table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names based on the new structure headers = [ "Instance ID", "Instance Name", "Current Instance Type", "Region", "Findings", "Recommended Instance Type", "Migration Effort", "Savings Opportunity (%)", "Estimated Monthly Savings ($)", "Price Difference ($/hr)" ] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Populate the table with data from recommendations for row_num, recommendation in enumerate(recommendations, start=1): instance_id = recommendation['instanceArn'].split('/')[-1] instance_name = recommendation.get('instanceName', 'N/A') current_instance_type = recommendation.get('currentInstanceType', 'N/A') region = recommendation['instanceArn'].split(':')[3] findings = recommendation.get('finding', 'N/A') migration_effort = "N/A" savings_opportunity_percentage = "N/A" estimated_monthly_savings_value = "N/A" price_difference = "N/A" if recommendation.get('recommendationOptions'): option = recommendation['recommendationOptions'][0] recommended_instance_type = option.get('instanceType') migration_effort = option.get('migrationEffort', 'N/A') savings_opportunity_percentage = option.get('savingsOpportunity', {}).get('savingsOpportunityPercentage', 'N/A') estimated_monthly_savings_value = option.get('savingsOpportunity', {}).get('estimatedMonthlySavings', {}).get('value', 'N/A') current_price = get_price_for_instance(current_instance_type, region) recommended_price = get_price_for_instance(recommended_instance_type, region) price_difference = "N/A" if current_price is None or recommended_price is None else f"{current_price - recommended_price:.4f}" values = [ instance_id, instance_name, current_instance_type, region, findings, recommended_instance_type, migration_effort, f"{savings_opportunity_percentage}%", f"${estimated_monthly_savings_value}", f"${price_difference}/hr" ] table.num_rows += 1 # Add a row for each recommendation for col_num, value in enumerate(values): table.setval(row_num, col_num, value) def get_ec2_rightsizing_recommendations(region_name_to_search_recommendations=None): regions_to_search = [] if region_name_to_search_recommendations: regions_to_search.append(region_name_to_search_recommendations) else: # Fetch all regions if none specified ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') all_regions_response = ec2_client.describe_regions() regions_to_search = [region['RegionName'] for region in all_regions_response['Regions']] all_recommendations = [] for region in regions_to_search: try: # Initialize compute-optimizer client with the proper region local_compute_optimizer_client = boto3.client('compute-optimizer', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) next_token = None #page_counter = 1 # To count the number of pages fetched while True: if next_token: response = local_compute_optimizer_client.get_ec2_instance_recommendations(NextToken=next_token) else: response = local_compute_optimizer_client.get_ec2_instance_recommendations() recommendations = response.get('instanceRecommendations', []) if recommendations: all_recommendations.extend(recommendations) #print(f"Fetched {len(recommendations)} recommendations for page {page_counter}.") # Pagination - Check if there's a next page of recommendations next_token = response.get('NextToken') if not next_token: break # Exit loop if there's no more data to fetch #page_counter += 1 except ClientError as error: print(f"Client error in region {region}: {error}") except BotoCoreError as error: print(f"BotoCore error in region {region}: {error}") return all_recommendations def process_recommendations(region_name_to_search_recommendations=None): # Fetch recommendations once, using the provided region or searching all regions. recommendations = get_ec2_rightsizing_recommendations(region_name_to_search_recommendations) display_instance_recommendations(recommendations) # table printing line # If no recommendations were found after searching, exit the function. if not recommendations: print("No recommendations found. Please check if the region is correct or if there are any permissions issues.") return data_for_plotting = [] # Iterate through the fetched recommendations for processing. for recommendation in recommendations: # Extract details from each recommendation as before... instance_id = recommendation['instanceArn'].split('/')[-1] instance_name = recommendation.get('instanceName', 'N/A') findings = recommendation.get('finding', 'N/A') finding_reasons = ", ".join(recommendation.get('findingReasonCodes', [])) instance_state = recommendation.get('instanceState', 'N/A') current_instance_type = recommendation.get('currentInstanceType', 'N/A') tags = json.dumps(recommendation.get('tags', []), default=datetime_converter) account_id = recommendation['instanceArn'].split(':')[4] region = recommendation['instanceArn'].split(':')[3] # Print details for each recommendation... print(f"Instance ID: {instance_id}") print(f"Instance Name: {instance_name}") print(f"Findings: {findings}") print(f"Finding Reasons: {finding_reasons}") print(f"Instance State: {instance_state}") print(f"Current Instance Type: {current_instance_type}") print(f"Tags: {tags}") print(f"Account ID: {account_id}") print(f"Region: {region}") print("-" * 50) for option in recommendation['recommendationOptions']: recommended_instance_type = option.get('instanceType') migration_effort = option.get('migrationEffort', 'N/A') savings_opportunity_percentage = option.get('savingsOpportunity', {}).get('savingsOpportunityPercentage', 'N/A') estimated_monthly_savings_value = option.get('savingsOpportunity', {}).get('estimatedMonthlySavings', {}).get('value', 'N/A') current_price = get_price_for_instance(current_instance_type, region) recommended_price = get_price_for_instance(recommended_instance_type, region) price_difference = "N/A" if current_price is None or recommended_price is None else current_price - recommended_price data_for_plotting.append({ "instance_id": instance_id, "instance_name": instance_name, "estimated_monthly_savings_value": estimated_monthly_savings_value }) print(f"\tRecommended Instance Type: {recommended_instance_type}") print(f"\tMigration Effort: {migration_effort}") print(f"\tSavings Opportunity (%): {savings_opportunity_percentage}") print(f"\tEstimated Monthly Savings: USD {estimated_monthly_savings_value}") print(f"\tCurrent Price: {current_price if current_price is not None else 'N/A'} USD per hour") print(f"\tRecommended Price: {recommended_price if recommended_price is not None else 'N/A'} USD per hour") print(f"\tPrice Difference: {price_difference} USD per hour") print("-" * 25) return data_for_plotting #region_name_to_search_recommendations = None data_for_plotting = process_recommendations(region_name_to_search_recommendations)copied3.3.1 - 3.3.2yL3S7RVMgqHLjI2r0KezPlot Savings based on AWS EC2 Rightsizing Recommendations
3.3.2
Plot Savings based on AWS EC2 Rightsizing Recommendations
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task generates a bar chart visualizing AWS EC2 rightsizing savings, with instance names on the X-axis and different recommendations distinguished by instance ID and rank in the legend.
inputsoutputs# print(json.dumps(data_for_plotting,indent=4)) # Aggregate savings values for each instance, keeping track of both instance ID and name savings_by_instance = {} for entry in data_for_plotting: instance_id = entry["instance_id"] instance_name = entry["instance_name"] # Keep instance name for labeling purposes savings_value = entry["estimated_monthly_savings_value"] # Check if the instance ID is already a key in the dictionary if instance_id not in savings_by_instance: savings_by_instance[instance_id] = {'name': instance_name, 'savings': [savings_value]} else: savings_by_instance[instance_id]['savings'].append(savings_value) # Plotting context.plot.xlabel = "Instance Name" context.plot.ylabel = "Estimated Monthly Savings ($)" context.plot.title = "Estimated Monthly Savings by Instance" # Add a trace for each instance's savings values for instance_id, info in savings_by_instance.items(): instance_name = info['name'] # Retrieve instance name for labeling savings_values = info['savings'] for i, savings_value in enumerate(savings_values): trace_name = f"({instance_id})-Rec{i+1}" context.plot.add_trace(name=trace_name, xpts=[instance_name], ypts=[savings_value], tracetype='bar')copied3.3.2
- 4gCIOraWGVXTTGxS8xauyEBS Cost Analysis
4
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputs4- 4.1zSjBBLqgDzGa6tgqOTkFRightsizing Recommendations for AWS EBS Volumes using AWS Compute Optimizer
4.1
Rightsizing Recommendations for AWS EBS Volumes using AWS Compute Optimizer
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This runbook aids in enhancing storage efficiency by providing insights based on usage analysis. This service suggests optimal EBS volume configurations, including type, size, and IOPS, to align with performance needs and cost savings.
inputsoutputsregion_name_to_search_recommendations = Nonecopied4.1- 4.1.1p7esvHfNzElz5LFA2wqgFetch Rightsizing Recommendations for AWS EBS Volumes
4.1.1
Fetch Rightsizing Recommendations for AWS EBS Volumes
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task involves analyzing EBS volume usage to offer configuration changes for cost efficiency and performance improvement, based on historical data analysis.
inputsoutputsimport boto3 import json from datetime import datetime, timezone creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] # Define the pricing_client at the beginning of your script to ensure it's available globally pricing_client = boto3.client('pricing', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') def get_ebs_volume_recommendations(region_name=None): """ Fetch EBS volume recommendations from AWS Compute Optimizer for a specific region or all regions. """ if region_name: regions = [region_name] else: # Initialize a client for the EC2 service to fetch all regions ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') regions_response = ec2_client.describe_regions() regions = [region['RegionName'] for region in regions_response['Regions']] recommendations = [] for region in regions: try: # Initialize Compute Optimizer client for each region compute_optimizer_client = boto3.client('compute-optimizer', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) next_token = None while True: params = {} if next_token: params['NextToken'] = next_token response = compute_optimizer_client.get_ebs_volume_recommendations(**params) recommendations.extend(response.get('volumeRecommendations', [])) next_token = response.get('NextToken', None) if not next_token: break except Exception as e: print(f"Error fetching EBS volume recommendations for region {region}: {e}") return recommendations def get_ebs_price(volume_type, size_gb, region): # Mapping AWS region to the Pricing API format region_name_map = { "us-east-1": "US East (N. Virginia)", "us-east-2": "US East (Ohio)", "us-west-1": "US West (N. California)", "us-west-2": "US West (Oregon)", "af-south-1": "Africa (Cape Town)", "ap-east-1": "Asia Pacific (Hong Kong)", "ap-south-1": "Asia Pacific (Mumbai)", "ap-northeast-3": "Asia Pacific (Osaka)", "ap-northeast-2": "Asia Pacific (Seoul)", "ap-southeast-1": "Asia Pacific (Singapore)", "ap-southeast-2": "Asia Pacific (Sydney)", "ap-northeast-1": "Asia Pacific (Tokyo)", "ca-central-1": "Canada (Central)", "eu-central-1": "EU (Frankfurt)", "eu-west-1": "EU (Ireland)", "eu-west-2": "EU (London)", "eu-south-1": "EU (Milan)", "eu-west-3": "EU (Paris)", "eu-north-1": "EU (Stockholm)", "me-south-1": "Middle East (Bahrain)", "sa-east-1": "South America (São Paulo)"} region_name = region_name_map.get(region, region) #print(f"searching for region {region_name}") # for debugging try: price_response = pricing_client.get_products( ServiceCode='AmazonEC2', Filters=[ {'Type': 'TERM_MATCH', 'Field': 'volumeApiName', 'Value': volume_type}, # Adjusted to 'volumeApiName' {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': region_name}, {'Type': 'TERM_MATCH', 'Field': 'productFamily', 'Value': 'Storage'} ], MaxResults=1 # Increased MaxResults to ensure broader search results ) #print(price_response) # for debugging # Ensure there's at least one price listed if price_response['PriceList']: # Assuming the first price item's details are representative price_data = json.loads(price_response['PriceList'][0]) terms = price_data.get('terms', {}).get('OnDemand', {}) if terms: price_dimensions = next(iter(terms.values()))['priceDimensions'] price_per_gb = next(iter(price_dimensions.values()))['pricePerUnit']['USD'] # Calculate total price based on the volume size total_price = float(price_per_gb) * size_gb return total_price else: print("No pricing terms found.") return None except Exception as e: print(f"Error fetching price for EBS volume: {e}") return None def process_recommendations(recommendations): for recommendation in recommendations: volume_arn = recommendation['volumeArn'] region = volume_arn.split(':')[3] # for pricing api query current_configuration = recommendation['currentConfiguration'] finding = recommendation['finding'] finding_reasons_codes = recommendation.get('findingReasonCodes', []) print(f"Volume ARN: {volume_arn}") print(f"Region: {region}") print(f"Current Configuration: {json.dumps(current_configuration, indent=2)}") print(f"Finding: {finding} {' | '.join(finding_reasons_codes) if finding_reasons_codes else ''}") if 'volumeRecommendationOptions' in recommendation: for option in recommendation['volumeRecommendationOptions']: configuration = option['configuration'] performance_risk = option.get('performanceRisk', 'N/A') rank = option['rank'] volume_type = configuration['volumeType'] size_gb = configuration['volumeSize'] current_price = get_ebs_price(current_configuration['volumeType'], current_configuration['volumeSize'], region) recommended_price = get_ebs_price(volume_type, size_gb, region) print(f"\tRecommended Configuration: {json.dumps(configuration, indent=4)}") print(f"\tPerformance Risk: {performance_risk}") print(f"\tRank: {rank}") print(f"\tCurrent Price: ${current_price} per month") print(f"\tRecommended Price: ${recommended_price} per month") # Calculate and print savings if current_price and recommended_price: savings = current_price - recommended_price print(f"\tEstimated Monthly Savings: ${savings:.2f}") print("-" * 60) else: print("\tNo recommendation options provided.") print("-" * 60) # Example usage #region_name_to_search_recommendations = 'us-east-1' # Set to None for all regions recommendations = get_ebs_volume_recommendations(region_name_to_search_recommendations) if recommendations: print("Processing Recommendations") process_recommendations(recommendations) else: print("No EBS volume recommendations available.")copied4.1.1
- 5CIEgWqVasfoBbwMdGzW4RDS Cost Analysis
5
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.inputsoutputs5- 5.1muwkw9vPhOYo6qgLyRfiAggregate and Visualize Comprehensive RDS CPU Utilization
5.1
Aggregate and Visualize Comprehensive RDS CPU Utilization
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task aggregates CPU utilization data for running RDS instances across an AWS account, computes the average CPU usage over a specified period, and plots the average to help in assessing overall resource efficiency.
inputsoutputsimport boto3 from datetime import datetime, timedelta region_name=None # None when you want to run the script for all regions #last_n_days = 30 # AWS Credentials - replace with your method to retrieve AWS credentials creds = _get_creds(cred_label)['creds'] # Placeholder function access_key = creds['username'] secret_key = creds['password'] def get_aws_regions(): """Get a list of all AWS regions.""" ec2 = boto3.client('ec2',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name='us-east-1') regions = ec2.describe_regions() return [region['RegionName'] for region in regions['Regions']] def fetch_rds_instances(region): """Fetch all RDS instances in a specific region.""" rds = boto3.client('rds',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) paginator = rds.get_paginator('describe_db_instances') page_iterator = paginator.paginate() rds_instances = [] for page in page_iterator: for instance in page['DBInstances']: rds_instances.append({ 'DBInstanceIdentifier': instance['DBInstanceIdentifier'], 'Region': region, }) return rds_instances def fetch_cpu_utilization(db_instance_identifier, region, start_time, end_time): """Fetch the average CPU utilization for an RDS instance.""" cloudwatch = boto3.client('cloudwatch',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) metrics = cloudwatch.get_metric_statistics( Namespace='AWS/RDS', MetricName='CPUUtilization', Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': db_instance_identifier}], StartTime=start_time, EndTime=end_time, Period=3600, Statistics=['Average'] ) data_points = metrics.get('Datapoints', []) if data_points: avg_cpu = sum(dp['Average'] for dp in data_points) / len(data_points) else: avg_cpu = 0 return avg_cpu def plot_cpu_utilization(region_name=None, last_n_days=7): """Plot CPU utilization for RDS instances.""" start_time = datetime.utcnow() - timedelta(days=last_n_days) end_time = datetime.utcnow() regions = [region_name] if region_name else get_aws_regions() for region in regions: rds_instances = fetch_rds_instances(region) avg_utilizations = [] for instance in rds_instances: avg_cpu = fetch_cpu_utilization(instance['DBInstanceIdentifier'], region, start_time, end_time) avg_utilizations.append((instance['DBInstanceIdentifier'], avg_cpu)) avg_utilizations.sort(key=lambda x: x[1], reverse=True) top_instances = avg_utilizations[:3] bottom_instances = avg_utilizations[-3:] instance_ids = [x[0] for x in top_instances + bottom_instances] utilizations = [x[1] for x in top_instances + bottom_instances] # Plotting context.plot.add_trace( name="CPU Utilization", xpts=instance_ids, ypts=utilizations, tracetype='bar' ) context.plot.xlabel = 'Instance ID' context.plot.ylabel = 'Average CPU Utilization (%)' context.plot.title = f'Top & Bottom 3 RDS Instances by CPU Utilization (Last {last_n_days} Days)' # Example usage plot_cpu_utilization(region_name, last_n_days) # For all regions # plot_cpu_utilization(region_name='us-east-1', last_n_days=30) # For a specific regioncopied5.1 - 5.2gR6d2ayH2Q6SdugCwNmrDaily AWS RDS Costs using Athena
5.2
Daily AWS RDS Costs using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Specifically examine daily costs associated with AWS RDS, utilizing Athena to query CUR data for detailed spending analysis on database services.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs, last_n_days): print("x values (dates):", dates) print("y values (costs):", costs) context.plot.add_trace(name=f'Amazon RDS Costs (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines") context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily Amazon RDS Costs (Last {last_n_days} Days excluding last 2 days)' # last_n_days = 7 # To be passed as an inpur parameter query = f""" SELECT DATE(line_item_usage_start_date) AS usage_date, line_item_product_code AS product_code, product_database_engine AS database_engine, SUM(line_item_unblended_cost) AS daily_cost FROM my_cur_report_athena WHERE line_item_product_code = 'AmazonRDS' AND line_item_usage_start_date >= DATE_ADD('day', -{last_n_days} - 2, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) AND product_database_engine IS NOT NULL AND product_database_engine != '' GROUP BY DATE(line_item_usage_start_date), line_item_product_code, product_database_engine ORDER BY usage_date, product_code; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [row[0] for row in results] costs = [float(row[3]) for row in results] visualize_data(dates, costs, last_n_days) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.")copied5.2
- 6KL71A6cT6b12qKVHZfrAAWS Costs per Linked Account using Athena
6
AWS Costs per Linked Account using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.Segment and analyze costs for each linked account under a consolidated billing setup using Athena to query CUR data, providing clarity on spending distribution across accounts.
inputsoutputsimport boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(accounts, costs): if len(accounts) == 1: print("Only one account found. Skipping chart.") else: print("x values (accounts):", accounts) print("y values (costs):", costs) context.plot.xlabel = 'AWS Account ID' context.plot.ylabel = 'Cost ($)' context.plot.title = 'Cost by AWS Account ID' context.plot.add_trace(name="Cost by AWS Account ID", xpts=accounts, ypts=costs, tracetype="pie") print("Analysis complete.") # last_n_days = 7 # To be passed as an input parameter query = f""" SELECT line_item_usage_account_id AS account_id, SUM(line_item_unblended_cost) AS total_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days} - 2, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY line_item_usage_account_id ORDER BY total_cost DESC; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: accounts = [row[0] for row in results] costs = [float(row[1]) for row in results] visualize_data(accounts, costs) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.")copied6