agent: |
gR6d2ayH2Q6SdugCwNmrDaily AWS RDS Costs using Athena
Daily AWS RDS Costs using Athena
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
Specifically examine daily costs associated with AWS RDS, utilizing Athena to query CUR data for detailed spending analysis on database services.
inputs
outputs
import boto3
import time
from botocore.exceptions import ClientError, BotoCoreError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def run_athena_query(query, database, s3_output):
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
try:
response = athena_client.start_query_execution(
QueryString=query,
QueryExecutionContext={'Database': database},
ResultConfiguration={'OutputLocation': s3_output}
)
query_execution_id = response['QueryExecutionId']
print(f"Query execution started, ID: {query_execution_id}")
return query_execution_id
except (ClientError, BotoCoreError) as e:
print(f"Failed to execute query: {e}")
return None
def check_query_status(athena_client, query_execution_id):
while True:
try:
response = athena_client.get_query_execution(QueryExecutionId=query_execution_id)
status = response['QueryExecution']['Status']['State']
if status == 'SUCCEEDED':
print(f"Query {query_execution_id} succeeded.")
return True
elif status in ['FAILED', 'CANCELLED']:
print(f"Query {query_execution_id} failed or was cancelled.")
return False
time.sleep(5)
except (ClientError, BotoCoreError) as e:
print(f"Error checking query status: {e}")
return False
def get_query_results(athena_client, query_execution_id):
try:
response = athena_client.get_query_results(QueryExecutionId=query_execution_id)
result_data = response['ResultSet']['Rows']
header = [col['VarCharValue'] for col in result_data[0]['Data']]
results = [[col['VarCharValue'] for col in row['Data']] for row in result_data[1:]]
return header, results
except (ClientError, BotoCoreError) as e:
print(f"Error retrieving query results: {e}")
return None, None
def visualize_data(dates, costs, last_n_days):
print("x values (dates):", dates)
print("y values (costs):", costs)
context.plot.add_trace(name=f'Amazon RDS Costs (Last {last_n_days} Days)', xpts=dates, ypts=costs, tracetype="lines")
context.plot.xlabel = 'Date'
context.plot.ylabel = 'Cost ($)'
context.plot.title = f'Daily Amazon RDS Costs (Last {last_n_days} Days excluding last 2 days)'
# last_n_days = 7 # To be passed as an inpur parameter
query = f"""
SELECT
DATE(line_item_usage_start_date) AS usage_date,
line_item_product_code AS product_code,
product_database_engine AS database_engine,
SUM(line_item_unblended_cost) AS daily_cost
FROM
my_cur_report_athena
WHERE
line_item_product_code = 'AmazonRDS'
AND line_item_usage_start_date >= DATE_ADD('day', -{last_n_days} - 2, CURRENT_DATE)
AND line_item_usage_start_date < DATE_ADD('day', -2, CURRENT_DATE)
AND product_database_engine IS NOT NULL
AND product_database_engine != ''
GROUP BY
DATE(line_item_usage_start_date),
line_item_product_code,
product_database_engine
ORDER BY
usage_date,
product_code;
"""
database = 'athenacurcfn_my_c_u_r_report_athena'
#s3_output = 's3://dagknows-cur-logging-bucket-athena-query-results-188379622596/dev_query_results/'
#bucket_name = 'dagknows-cur-logging-bucket-athena-query-results-188379622596' # To be dynamically received from upstream task
prefix_path = 'dev_query_results'
s3_output = f"s3://{bucket_name}/{prefix_path}/"
query_execution_id = run_athena_query(query, database, s3_output)
if query_execution_id:
athena_client = boto3.client('athena', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1")
if check_query_status(athena_client, query_execution_id):
header, results = get_query_results(athena_client, query_execution_id)
if results:
dates = [row[0] for row in results]
costs = [float(row[3]) for row in results]
visualize_data(dates, costs, last_n_days)
else:
print("No results to show.")
else:
print("Query did not succeed. No results to show.")
else:
print("Query execution failed. Exiting.")
copied