fSFO8HDNIY2YN1QGMiFkDaily AWS Costs using Athena
Daily AWS Costs using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
Analyze daily AWS spending by querying the CUR data stored in S3 with Athena, providing insights into overall cost trends and spikes.
inputs
outputs
import boto3
import time
from botocore.exceptions import ClientError, BotoCoreError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def run_athena_query(query, database, s3_output):
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
try:
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': s3_output}
)
query_execution_id = response['QueryExecutionId']
print(f"Query execution started, ID: {query_execution_id}")
return query_execution_id
except (ClientError, BotoCoreError) as e:
print(f"Failed to execute query: {e}")
return None
def check_query_status(athena_client, query_execution_id):
while True:
try:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response['QueryExecution']['Status']['State']
if status == 'SUCCEEDED':
print(f"Query {query_execution_id} succeeded.")
return True
elif status in ['FAILED', 'CANCELLED']:
print(f"Query {query_execution_id} failed or was cancelled.")
return False
time.sleep(5)
except (ClientError, BotoCoreError) as e:
print(f"Error checking query status: {e}")
return False
def get_query_results(athena_client, query_execution_id):
try:
response = athena_client.get_query_results(QueryExecutionId=query_execution_id)
result_data = response['ResultSet']['Rows']
header = [col['VarCharValue'] for col in result_data[0]['Data']]
results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]]
return header, results
except (ClientError, BotoCoreError) as e:
print(f"Error retrieving query results: {e}")
return None, None
def visualize_data(dates, costs, last_n_days):
x = dates # Dates are now strings
y = costs # Corresponding daily costs
print("x values (dates):", x)
print("y values (costs):", y)
context.plot.xlabel = 'Date'
context.plot.ylabel = 'Cost ($)'
context.plot.title = f'Daily AWS Costs (Last {last_n_days} Days)'
context.plot.add_trace(name=f'Daily AWS Costs (Last {last_n_days} Days)', xpts=x, ypts=y, tracetype="lines")
# last_n_days = 7 # Passes as an input parameter
query = f"""
SELECT
DATE(line_item_usage_start_date) AS usage_date,
SUM(line_item_unblended_cost) AS total_cost
FROM
my_cur_report_athena
WHERE
line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE)
AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE)
GROUP BY
DATE(line_item_usage_start_date)
ORDER BY
usage_date;
"""
database = 'athenacurcfn_my_c_u_r_report_athena'
#s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/'
#bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task
prefix_path = 'dev_query_results'
s3_output = f"s3://{bucket_name}/{prefix_path}/"
query_execution_id = run_athena_query(query, database, s3_output)
if query_execution_id:
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
if check_query_status(athena_client, query_execution_id):
header, results = get_query_results(athena_client, query_execution_id)
if results:
dates = [row[0] for row in results]
costs = [float(row[1]) for row in results]
print("x values (dates):", dates)
print("y values (costs):", costs)
visualize_data(dates, costs, last_n_days)
else:
print("No results to show.")
else:
print("Query did not succeed. No results to show.")
else:
print("Query execution failed. Exiting.")
copied