agent: |
dBIssYjV0DdxoufdUtePDaily AWS Costs by top_n_instances using athena
Daily AWS Costs by top_n_instances using athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
Focus on the highest cost-generating instances, querying CUR data with Athena to identify and analyze the top spending instances daily.
inputs
outputs
import boto3
import time
from botocore.exceptions import ClientError, BotoCoreError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def run_athena_query(query, database, s3_output):
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
try:
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': s3_output}
)
query_execution_id = response['QueryExecutionId']
print(f"Query execution started, ID: {query_execution_id}")
return query_execution_id
except (ClientError, BotoCoreError) as e:
print(f"Failed to execute query: {e}")
return None
def check_query_status(athena_client, query_execution_id):
while True:
try:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response['QueryExecution']['Status']['State']
if status == 'SUCCEEDED':
print(f"Query {query_execution_id} succeeded.")
return True
elif status in ['FAILED', 'CANCELLED']:
print(f"Query {query_execution_id} failed or was cancelled.")
return False
time.sleep(5)
except (ClientError, BotoCoreError) as e:
print(f"Error checking query status: {e}")
return False
def get_query_results(athena_client, query_execution_id):
try:
response = athena_client.get_query_results(QueryExecutionId=query_execution_id)
result_data = response['ResultSet']['Rows']
header = [col['VarCharValue'] for col in result_data[0]['Data']]
results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]]
return header, results
except (ClientError, BotoCoreError) as e:
print(f"Error retrieving query results: {e}")
return None, None
def visualize_data(dates, costs_by_instance, last_n_days, top_n_instances):
print("x values (dates):", dates)
for instance, costs in costs_by_instance.items():
print(f"y values (costs) for {instance}:", costs)
context.plot.add_trace(name=f'{instance} (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines")
context.plot.xlabel = 'Date'
context.plot.ylabel = 'Cost ($)'
context.plot.title = f'Top {top_n_instances} EC2 Instances Daily Costs (Last {last_n_days} Days excluding last 2 days)'
def main(last_n_days, top_n_instances):
query = f"""
WITH ranked_instances AS (
SELECT
DATE(line_item_usage_start_date) AS usage_date,
line_item_resource_id AS instance_id,
line_item_product_code AS product_code,
product_instance_type AS instance_type,
SUM(line_item_unblended_cost) AS daily_cost,
RANK() OVER (PARTITION BY DATE(line_item_usage_start_date) ORDER BY SUM(line_item_unblended_cost) DESC) AS rank
FROM
my_cur_report_athena
WHERE
line_item_product_code = 'AmazonEC2'
AND line_item_usage_start_date >= DATE_ADD('day', -{last_n_days + 2}, CURRENT_DATE)
AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE)
AND product_instance_type IS NOT NULL
AND product_instance_type != ''
GROUP BY
DATE(line_item_usage_start_date),
line_item_resource_id,
line_item_product_code,
product_instance_type
)
SELECT
usage_date,
instance_id,
product_code,
instance_type,
daily_cost
FROM
ranked_instances
WHERE
rank <= {top_n_instances}
ORDER BY
usage_date,
daily_cost DESC;
"""
database = 'athenacurcfn_my_c_u_r_report_athena'
#s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/'
#bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task
prefix_path = 'dev_query_results'
s3_output = f"s3://{bucket_name}/{prefix_path}/"
query_execution_id = run_athena_query(query, database, s3_output)
if query_execution_id:
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
if check_query_status(athena_client, query_execution_id):
header, results = get_query_results(athena_client, query_execution_id)
if results:
dates = []
costs_by_instance = {}
all_dates = set(row[0] for row in results)
for row in results:
date, instance_id, product_code, instance_type, cost = row
if date not in dates:
dates.append(date)
instance_key = f"{instance_type} ({instance_id})"
if instance_key not in costs_by_instance:
costs_by_instance[instance_key] = {d: 0 for d in all_dates}
costs_by_instance[instance_key][date] = float(cost)
# Convert costs_by_instance to lists for plotting
final_costs_by_instance = {k: [v[d] for d in dates] for k, v in costs_by_instance.items()}
visualize_data(dates, final_costs_by_instance, last_n_days, top_n_instances)
else:
print("No results to show.")
else:
print("Query did not succeed. No results to show.")
else:
print("Query execution failed. Exiting.")
# last_n_days = 7 # To be passed as an input parameter
# top_n_instances = 3 # To be passed as an input parameter
main(last_n_days, top_n_instances)
copied