agent: |
RG7Fx5TcHNgGVXL1VgsqDaily AWS Costs by service using Athena
Daily AWS Costs by service using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
Break down daily AWS expenses by service to pinpoint where spending is occurring, using Athena to query detailed CUR data.
inputs
outputs
import boto3
import time
from botocore.exceptions import ClientError, BotoCoreError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def run_athena_query(query, database, s3_output):
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
try:
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': s3_output}
)
query_execution_id = response['QueryExecutionId']
print(f"Query execution started, ID: {query_execution_id}")
return query_execution_id
except (ClientError, BotoCoreError) as e:
print(f"Failed to execute query: {e}")
return None
def check_query_status(athena_client, query_execution_id):
while True:
try:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response['QueryExecution']['Status']['State']
if status == 'SUCCEEDED':
print(f"Query {query_execution_id} succeeded.")
return True
elif status in ['FAILED', 'CANCELLED']:
print(f"Query {query_execution_id} failed or was cancelled.")
return False
time.sleep(5)
except (ClientError, BotoCoreError) as e:
print(f"Error checking query status: {e}")
return False
def get_query_results(athena_client, query_execution_id):
try:
response = athena_client.get_query_results(QueryExecutionId=query_execution_id)
result_data = response['ResultSet']['Rows']
header = [col['VarCharValue'] for col in result_data[0]['Data']]
results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]]
return header, results
except (ClientError, BotoCoreError) as e:
print(f"Error retrieving query results: {e}")
return None, None
def visualize_data(dates, costs_by_service, last_n_days):
print("x values (dates):", dates)
for service, costs in costs_by_service.items():
print(f"y values (costs) for {service}:", costs)
context.plot.add_trace(name=f'{service} (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines")
context.plot.xlabel = 'Date'
context.plot.ylabel = 'Cost ($)'
context.plot.title = f'Daily AWS Costs by service (Last {last_n_days} Days excluding last 2 days)'
def main(last_n_days, top_n_services):
query = f"""
WITH service_costs AS (
SELECT
DATE(line_item_usage_start_date) AS usage_date,
line_item_product_code AS service,
SUM(line_item_unblended_cost) AS daily_cost
FROM
my_cur_report_athena
WHERE
line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE)
AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE)
GROUP BY
DATE(line_item_usage_start_date),
line_item_product_code
),
top_services AS (
SELECT
service
FROM
service_costs
GROUP BY
service
ORDER BY
SUM(daily_cost) DESC
LIMIT {top_n_services}
)
SELECT
sc.usage_date,
sc.service,
sc.daily_cost
FROM
service_costs sc
JOIN
top_services ts
ON
sc.service = ts.service
ORDER BY
sc.usage_date,
sc.service;
"""
database = 'athenacurcfn_my_c_u_r_report_athena'
#s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/'
#bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task
prefix_path = 'dev_query_results'
s3_output = f"s3://{bucket_name}/{prefix_path}/"
query_execution_id = run_athena_query(query, database, s3_output)
if query_execution_id:
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
if check_query_status(athena_client, query_execution_id):
header, results = get_query_results(athena_client, query_execution_id)
if results:
dates = []
costs_by_service = {}
for row in results:
date, service, cost = row
if date not in dates:
dates.append(date)
if service not in costs_by_service:
costs_by_service[service] = []
costs_by_service[service].append(float(cost))
visualize_data(dates, costs_by_service, last_n_days)
else:
print("No results to show.")
else:
print("Query did not succeed. No results to show.")
else:
print("Query execution failed. Exiting.")
# last_n_days = 7 # To be passed as an input parameter
# top_n_services = 5 # To be passed as an input parameter
main(last_n_days, top_n_services)
copied