Sign in
agent:

Daily AWS Costs by top_n_instances using athena

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

Focus on the highest cost-generating instances, querying CUR data with Athena to identify and analyze the top spending instances daily.

import boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs_by_instance, last_n_days, top_n_instances): print("x values (dates):", dates) for instance, costs in costs_by_instance.items(): print(f"y values (costs) for {instance}:", costs) context.plot.add_trace(name=f'{instance} (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines") context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Top {top_n_instances} EC2 Instances Daily Costs (Last {last_n_days} Days excluding last 2 days)' def main(last_n_days, top_n_instances): query = f""" WITH ranked_instances AS ( SELECT DATE(line_item_usage_start_date) AS usage_date, line_item_resource_id AS instance_id, line_item_product_code AS product_code, product_instance_type AS instance_type, SUM(line_item_unblended_cost) AS daily_cost, RANK() OVER (PARTITION BY DATE(line_item_usage_start_date) ORDER BY SUM(line_item_unblended_cost) DESC) AS rank FROM my_cur_report_athena WHERE line_item_product_code = 'AmazonEC2' AND line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) AND product_instance_type IS NOT NULL AND product_instance_type != '' GROUP BY DATE(line_item_usage_start_date), line_item_resource_id, line_item_product_code, product_instance_type ) SELECT usage_date, instance_id, product_code, instance_type, daily_cost FROM ranked_instances WHERE rank <= {top_n_instances} ORDER BY usage_date, daily_cost DESC; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [] costs_by_instance = {} all_dates = set(row[0] for row in results) for row in results: date, instance_id, product_code, instance_type, cost = row if date not in dates: dates.append(date) instance_key = f"{instance_type} ({instance_id})" if instance_key not in costs_by_instance: costs_by_instance[instance_key] = {d: 0 for d in all_dates} costs_by_instance[instance_key][date] = float(cost) # Convert costs_by_instance to lists for plotting final_costs_by_instance = {k: [v[d] for d in dates] for k, v in costs_by_instance.items()} visualize_data(dates, final_costs_by_instance, last_n_days, top_n_instances) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.") # last_n_days = 7 # To be passed as an input parameter # top_n_instances = 3 # To be passed as an input parameter main(last_n_days, top_n_instances)
copied