Sign in
agent:

Daily AWS Costs by service using Athena

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

Break down daily AWS expenses by service to pinpoint where spending is occurring, using Athena to query detailed CUR data.

import boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs_by_service, last_n_days): print("x values (dates):", dates) for service, costs in costs_by_service.items(): print(f"y values (costs) for {service}:", costs) context.plot.add_trace(name=f'{service} (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines") context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily AWS Costs by service (Last {last_n_days} Days excluding last 2 days)' def main(last_n_days, top_n_services): query = f""" WITH service_costs AS ( SELECT DATE(line_item_usage_start_date) AS usage_date, line_item_product_code AS service, SUM(line_item_unblended_cost) AS daily_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY DATE(line_item_usage_start_date), line_item_product_code ), top_services AS ( SELECT service FROM service_costs GROUP BY service ORDER BY SUM(daily_cost) DESC LIMIT {top_n_services} ) SELECT sc.usage_date, sc.service, sc.daily_cost FROM service_costs sc JOIN top_services ts ON sc.service = ts.service ORDER BY sc.usage_date, sc.service; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [] costs_by_service = {} for row in results: date, service, cost = row if date not in dates: dates.append(date) if service not in costs_by_service: costs_by_service[service] = [] costs_by_service[service].append(float(cost)) visualize_data(dates, costs_by_service, last_n_days) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.") # last_n_days = 7 # To be passed as an input parameter # top_n_services = 5 # To be passed as an input parameter main(last_n_days, top_n_services)
copied