Sign in

Fetching AWS Cost and Usage Report from S3

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

This task processes the AWS Cost and Usage Report (CUR) from an S3 bucket. Using the boto3 SDK, it retrieves a gzipped CSV, decodes it, and converts it into a Pandas DataFrame for analysis.

import boto3 import gzip import pandas as pd from io import StringIO from datetime import datetime, timedelta from botocore.exceptions import ParamValidationError #BUCKET_NAME = 'dagknowscostreport' #BASE_PATH = 'costreport/dagknowscostreport/' # Is the path from bucket root directory to the 'dated folders' which contain the CUR Reports #FILENAME = 'dagknowscostreport-00001.csv.gz' #last_n_days = 100 # Retrieve AWS credentials from the vault creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] s3_client = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) def list_s3_keys(bucket, prefix): s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) keys = [] kwargs = {'Bucket': bucket, 'Prefix': prefix} while True: response = s3.list_objects_v2(**kwargs) for obj in response.get('Contents', []): keys.append(obj['Key']) try: kwargs['ContinuationToken'] = response['NextContinuationToken'] except KeyError: break return keys def fetch_data_from_s3(file_key): try: s3 = boto3.client('s3',aws_access_key_id=access_key,aws_secret_access_key=secret_key) response = s3.get_object(Bucket=BUCKET_NAME, Key=file_key) gz_content = response['Body'].read() csv_content = gzip.decompress(gz_content).decode('utf-8') return pd.read_csv(StringIO(csv_content), low_memory=False) except Exception as e: print(f"Error fetching data from S3 for key {file_key}: {e}") return None # Function to get the end of the previous month def get_end_of_last_month(date): return (date.replace(day=1) - timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) def check_column_existence(df, column_name): if column_name not in df.columns: print(f"Warning: Column '{column_name}' not found in the DataFrame!") return False #print(f"Column '{column_name}' exists in the DataFrame.") return True def list_folders(prefix): paginator = s3_client.get_paginator('list_objects_v2') folders = [] for page in paginator.paginate(Bucket=BUCKET_NAME, Prefix=prefix, Delimiter='/'): folders.extend([content['Prefix'] for content in page.get('CommonPrefixes', [])]) return folders def list_csv_gz_files(folder): response = s3_client.list_objects_v2(Bucket=BUCKET_NAME, Prefix=folder) files = [{'key': obj['Key'], 'last_modified': obj['LastModified']} for obj in response.get('Contents', []) if obj['Key'].endswith('.csv.gz')] return files ''' def print_last_file_info(files): if not files: print("No CSV GZ files found in the folder.") return # Sort files by last modified time last_file = sorted(files, key=lambda x: x['last_modified'], reverse=True)[0] print(f"Last file: {last_file['key']}, Last modified: {last_file['last_modified']}") ''' def process_data(bucket_name, base_path, last_n_days): try: # Setup for fetching data end_date = datetime.utcnow() - timedelta(days=1) start_date = end_date - timedelta(days=last_n_days) # Modified to use the list_folders function for getting month range folders month_ranges = list_folders(base_path) all_keys = [] for month_range in month_ranges: # Logic to fetch the last .csv.gz file from the last folder of the month keys_for_month = list_csv_gz_files(month_range) if keys_for_month: # Sort files by last modified time and get the last file last_file = sorted(keys_for_month, key=lambda x: x['last_modified'], reverse=True)[0] all_keys.append(last_file['key']) #print(f"Last file for {month_range}: {last_file['key']}, Last modified: {last_file['last_modified']}") else: print(f"No CSV GZ files found in the folder {month_range}.") # Fetch and process data from identified keys dfs = [fetch_data_from_s3(key) for key in all_keys] df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame() # Further processing on the DataFrame df['lineItem/UsageStartDate'] = pd.to_datetime(df['lineItem/UsageStartDate']) df['day'] = df['lineItem/UsageStartDate'].dt.date # Exclude the latest date from the DataFrame latest_date = df['day'].max() df = df[df['day'] < latest_date] # Check required columns required_columns = [ 'lineItem/ProductCode', 'lineItem/UnblendedCost', 'lineItem/BlendedCost', 'lineItem/UsageStartDate', 'lineItem/UsageAccountId', 'lineItem/NormalizedUsageAmount', 'product/productFamily', 'product/instanceType' ] # Check if all required columns exist if not all([check_column_existence(df, col) for col in required_columns]): print("One or more required columns are missing.") return pd.DataFrame(), False # Returning an empty DataFrame and False return df, len(dfs) > 0 except Exception as e: print(f"ERROR: {e}") return pd.DataFrame(), False folders = list_folders(BASE_PATH) for folder in folders: #print(f"Processing folder: {folder}") files = list_csv_gz_files(folder) #print_last_file_info(files) df, data_fetched = process_data(BUCKET_NAME, BASE_PATH, last_n_days) if data_fetched: #print("Proceeding with further operations") p=1 # dummy line to end the conditional block else: print("No data fetched. Exiting operation.") context.proceed = False #print("Script Execution End")
copied