Sign in

Daily AWS Costs using Athena

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

Analyze daily AWS spending by querying the CUR data stored in S3 with Athena, providing insights into overall cost trends and spikes.

import boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(dates, costs, last_n_days): x = dates # Dates are now strings y = costs # Corresponding daily costs print("x values (dates):", x) print("y values (costs):", y) context.plot.xlabel = 'Date' context.plot.ylabel = 'Cost ($)' context.plot.title = f'Daily AWS Costs (Last {last_n_days} Days)' context.plot.add_trace(name=f'Daily AWS Costs (Last {last_n_days} Days)', xpts=x, ypts=y, tracetype="lines") # last_n_days = 7 # Passes as an input parameter query = f""" SELECT DATE(line_item_usage_start_date) AS usage_date, SUM(line_item_unblended_cost) AS total_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY DATE(line_item_usage_start_date) ORDER BY usage_date; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: dates = [row[0] for row in results] costs = [float(row[1]) for row in results] print("x values (dates):", dates) print("y values (costs):", costs) visualize_data(dates, costs, last_n_days) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.")
copied