vgyrSZL2h3TRAPxILN4CFetching AWS Cost and Usage Report from S3
Fetching AWS Cost and Usage Report from S3
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
This task processes the AWS Cost and Usage Report (CUR) from an S3 bucket. Using the boto3 SDK, it retrieves a gzipped CSV, decodes it, and converts it into a Pandas DataFrame for analysis.
inputs
outputs
import boto3
import gzip
import pandas as pd
from io import StringIO
from datetime import datetime, timedelta
from botocore.exceptions import ParamValidationError
#BUCKET_NAME = 'dagknowscostreport'
#BASE_PATH = 'costreport/dagknowscostreport/' # Is the path from bucket root directory to the 'dated folders' which contain the CUR Reports
#FILENAME = 'dagknowscostreport-00001.csv.gz'
#last_n_days = 100
# Retrieve AWS credentials from the vault
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key)
def list_s3_keys(bucket, prefix):
s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key)
keys = []
kwargs = {'Bucket': bucket, 'Prefix': prefix}
while True:
response = s3.list_objects_v2(**kwargs)
for obj in response.get('Contents', []):
keys.append(obj['Key'])
try:
kwargs['ContinuationToken'] = response['NextContinuationToken']
except KeyError:
break
return keys
def fetch_data_from_s3(file_key):
try:
s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key)
response = s3.get_object(Bucket=BUCKET_NAME, Key=file_key)
gz_content = response['Body'].read()
csv_content = gzip.decompress(gz_content).decode('utf-8')
return pd.read_csv(StringIO(csv_content), low_memory=False)
except Exception as e:
print(f"Error fetching data from S3 for key {file_key}: {e}")
return None
# Function to get the end of the previous month
def get_end_of_last_month(date):
return (date.replace(day=1) - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
def check_column_existence(df, column_name):
if column_name not in df.columns:
print(f"Warning: Column '{column_name}' not found in the DataFrame!")
return False
#print(f"Column '{column_name}' exists in the DataFrame.")
return True
def list_folders(prefix):
paginator = s3_client.get_paginator('list_objects_v2')
folders = []
for page in paginator.paginate(Bucket=BUCKET_NAME, Prefix=prefix, Delimiter='/'):
folders.extend([content['Prefix'] for content in page.get('CommonPrefixes', [])])
return folders
def list_csv_gz_files(folder):
response = s3_client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=folder)
files = [{'key': obj['Key'], 'last_modified': obj['LastModified']} for obj in response.get('Contents', []) if obj['Key'].endswith('.csv.gz')]
return files
'''
def print_last_file_info(files):
if not files:
print("No CSV GZ files found in the folder.")
return
# Sort files by last modified time
last_file = sorted(files, key=lambda x: x['last_modified'], reverse=True)[0]
print(f"Last file: {last_file['key']}, Last modified: {last_file['last_modified']}")
'''
def process_data(bucket_name, base_path, last_n_days):
try:
# Setup for fetching data
end_date = datetime.utcnow() - timedelta(days=1)
start_date = end_date - timedelta(days=last_n_days)
# Modified to use the list_folders function for getting month range folders
month_ranges = list_folders(base_path)
all_keys = []
for month_range in month_ranges:
# Logic to fetch the last .csv.gz file from the last folder of the month
keys_for_month = list_csv_gz_files(month_range)
if keys_for_month:
# Sort files by last modified time and get the last file
last_file = sorted(keys_for_month, key=lambda x: x['last_modified'], reverse=True)[0]
all_keys.append(last_file['key'])
#print(f"Last file for {month_range}: {last_file['key']}, Last modified: {last_file['last_modified']}")
else:
print(f"No CSV GZ files found in the folder {month_range}.")
# Fetch and process data from identified keys
dfs = [fetch_data_from_s3(key) for key in all_keys]
df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
# Further processing on the DataFrame
df['lineItem/UsageStartDate'] = pd.to_datetime(df['lineItem/UsageStartDate'])
df['day'] = df['lineItem/UsageStartDate'].dt.date
# Exclude the latest date from the DataFrame
latest_date = df['day'].max()
df = df[df['day'] < latest_date]
# Check required columns
required_columns = [
'lineItem/ProductCode',
'lineItem/UnblendedCost',
'lineItem/BlendedCost',
'lineItem/UsageStartDate',
'lineItem/UsageAccountId',
'lineItem/NormalizedUsageAmount',
'product/productFamily',
'product/instanceType'
]
# Check if all required columns exist
if not all([check_column_existence(df, col) for col in required_columns]):
print("One or more required columns are missing.")
return pd.DataFrame(), False # Returning an empty DataFrame and False
return df, len(dfs) > 0
except Exception as e:
print(f"ERROR: {e}")
return pd.DataFrame(), False
folders = list_folders(BASE_PATH)
for folder in folders:
#print(f"Processing folder: {folder}")
files = list_csv_gz_files(folder)
#print_last_file_info(files)
df, data_fetched = process_data(BUCKET_NAME, BASE_PATH, last_n_days)
if data_fetched:
#print("Proceeding with further operations")
p=1 # dummy line to end the conditional block
else:
print("No data fetched. Exiting operation.")
context.proceed = False
#print("Script Execution End")
copied