agent: |
KL71A6cT6b12qKVHZfrAAWS Costs per Linked Account using Athena
AWS Costs per Linked Account using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
Segment and analyze costs for each linked account under a consolidated billing setup using Athena to query CUR data, providing clarity on spending distribution across accounts.
inputs
outputs
import boto3
import time
from botocore.exceptions import ClientError, BotoCoreError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def run_athena_query(query, database, s3_output):
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
try:
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': s3_output}
)
query_execution_id = response['QueryExecutionId']
print(f"Query execution started, ID: {query_execution_id}")
return query_execution_id
except (ClientError, BotoCoreError) as e:
print(f"Failed to execute query: {e}")
return None
def check_query_status(athena_client, query_execution_id):
while True:
try:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response['QueryExecution']['Status']['State']
if status == 'SUCCEEDED':
print(f"Query {query_execution_id} succeeded.")
return True
elif status in ['FAILED', 'CANCELLED']:
print(f"Query {query_execution_id} failed or was cancelled.")
return False
time.sleep(5)
except (ClientError, BotoCoreError) as e:
print(f"Error checking query status: {e}")
return False
def get_query_results(athena_client, query_execution_id):
try:
response = athena_client.get_query_results(QueryExecutionId=query_execution_id)
result_data = response['ResultSet']['Rows']
header = [col['VarCharValue'] for col in result_data[0]['Data']]
results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]]
return header, results
except (ClientError, BotoCoreError) as e:
print(f"Error retrieving query results: {e}")
return None, None
def visualize_data(accounts, costs):
if len(accounts) == 1:
print("Only one account found. Skipping chart.")
else:
print("x values (accounts):", accounts)
print("y values (costs):", costs)
context.plot.xlabel = 'AWS Account ID'
context.plot.ylabel = 'Cost ($)'
context.plot.title = 'Cost by AWS Account ID'
context.plot.add_trace(name="Cost by AWS Account ID", xpts=accounts, ypts=costs, tracetype="pie")
print("Analysis complete.")
# last_n_days = 7 # To be passed as an input parameter
query = f"""
SELECT
line_item_usage_account_id AS account_id,
SUM(line_item_unblended_cost) AS total_cost
FROM
my_cur_report_athena
WHERE
line_item_usage_start_date >= DATE_ADD('day', -{last_n_days} - 2, CURRENT_DATE)
AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE)
GROUP BY
line_item_usage_account_id
ORDER BY
total_cost DESC;
"""
database = 'athenacurcfn_my_c_u_r_report_athena'
#s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/'
#bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task
prefix_path = 'dev_query_results'
s3_output = f"s3://{bucket_name}/{prefix_path}/"
query_execution_id = run_athena_query(query, database, s3_output)
if query_execution_id:
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
if check_query_status(athena_client, query_execution_id):
header, results = get_query_results(athena_client, query_execution_id)
if results:
accounts = [row[0] for row in results]
costs = [float(row[1]) for row in results]
visualize_data(accounts, costs)
else:
print("No results to show.")
else:
print("Query did not succeed. No results to show.")
else:
print("Query execution failed. Exiting.")
copied