Sign in
agent:

AWS Costs per Linked Account using Athena

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

Segment and analyze costs for each linked account under a consolidated billing setup using Athena to query CUR data, providing clarity on spending distribution across accounts.

import boto3 import time from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def run_athena_query(query, database, s3_output): athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") try: response = athena_client.start_query_execution( QueryString=query, QueryExecutionContext={'Database': database}, ResultConfiguration={'OutputLocation': s3_output} ) query_execution_id = response['QueryExecutionId'] print(f"Query execution started, ID: {query_execution_id}") return query_execution_id except (ClientError, BotoCoreError) as e: print(f"Failed to execute query: {e}") return None def check_query_status(athena_client, query_execution_id): while True: try: response = athena_client.get_query_execution(QueryExecutionId=query_execution_id) status = response['QueryExecution']['Status']['State'] if status == 'SUCCEEDED': print(f"Query {query_execution_id} succeeded.") return True elif status in ['FAILED', 'CANCELLED']: print(f"Query {query_execution_id} failed or was cancelled.") return False time.sleep(5) except (ClientError, BotoCoreError) as e: print(f"Error checking query status: {e}") return False def get_query_results(athena_client, query_execution_id): try: response = athena_client.get_query_results(QueryExecutionId=query_execution_id) result_data = response['ResultSet']['Rows'] header = [col['VarCharValue'] for col in result_data[0]['Data']] results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]] return header, results except (ClientError, BotoCoreError) as e: print(f"Error retrieving query results: {e}") return None, None def visualize_data(accounts, costs): if len(accounts) == 1: print("Only one account found. Skipping chart.") else: print("x values (accounts):", accounts) print("y values (costs):", costs) context.plot.xlabel = 'AWS Account ID' context.plot.ylabel = 'Cost ($)' context.plot.title = 'Cost by AWS Account ID' context.plot.add_trace(name="Cost by AWS Account ID", xpts=accounts, ypts=costs, tracetype="pie") print("Analysis complete.") # last_n_days = 7 # To be passed as an input parameter query = f""" SELECT line_item_usage_account_id AS account_id, SUM(line_item_unblended_cost) AS total_cost FROM my_cur_report_athena WHERE line_item_usage_start_date >= DATE_ADD('day', -{last_n_days} - 2, CURRENT_DATE) AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE) GROUP BY line_item_usage_account_id ORDER BY total_cost DESC; """ database = 'athenacurcfn_my_c_u_r_report_athena' #s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/' #bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task prefix_path = 'dev_query_results' s3_output = f"s3://{bucket_name}/{prefix_path}/" query_execution_id = run_athena_query(query, database, s3_output) if query_execution_id: athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") if check_query_status(athena_client, query_execution_id): header, results = get_query_results(athena_client, query_execution_id) if results: accounts = [row[0] for row in results] costs = [float(row[1]) for row in results] visualize_data(accounts, costs) else: print("No results to show.") else: print("Query did not succeed. No results to show.") else: print("Query execution failed. Exiting.")
copied