Sign in
agent:

Filter Unused AWS CloudWatch Log Streams

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

This task examines CloudWatch log streams to identify those that have been inactive for a specified duration. By pinpointing these dormant streams, the task aids in maintaining a cleaner, more efficient logging environment and can subsequently assist in reducing unnecessary storage costs associated with retaining outdated logs on AWS CloudWatch.

import boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def filter_unused_log_streams(all_log_streams, unused_days=30): unused_log_streams = [] for log_info in all_log_streams: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: log_stream = client.describe_log_streams( logGroupName=log_info['log_group'], logStreamNamePrefix=log_info['log_stream'] )['logStreams'][0] # We're using prefix, so getting the first result # Check if the log stream has a 'lastEventTimestamp' if 'lastEventTimestamp' in log_stream: last_event_date = datetime.utcfromtimestamp(log_stream['lastEventTimestamp'] / 1000) if last_event_date < datetime.utcnow() - timedelta(days=unused_days): unused_log_streams.append(log_info) except boto3.exceptions.Boto3Error as e: print(f"Error accessing log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}") except Exception as e: print(f"Unexpected error: {e}") return unused_log_streams def display_log_streams(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "Unused Log Streams Data" table.num_cols = 3 # Number of columns for Region, Log Group, and Log Stream table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names headers = ["Region", "Log Group", "Log Stream"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the log stream data by region for better readability data.sort(key=lambda x: x["region"]) # Populate the table with log stream data for row_num, entry in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each log stream entry values = [entry["region"], entry["log_group"], entry["log_stream"]] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main block # UNUSED_DAYS = 90 # all_log_streams to be passed down from parent task # Example structure, all_log_streams = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity all_log_streams = log_streams_data # Passed down from parent task unused_logs = filter_unused_log_streams(all_log_streams, UNUSED_DAYS) if unused_logs: display_log_streams(unused_logs) '''print("\nFiltered unused log streams:") for log in unused_logs: print(f"Region: {log['region']}, Log Group: {log['log_group']}, Log Stream: {log['log_stream']}")''' # Uncomment the line below if you want to see the full list of unused log streams # print(unused_logs) else: print("No Unused Logs") context.skip_sub_tasks=True
copied
  1. 1

    Delete AWS CloudWatch Log Streams

    There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

    This task allows users to remove specified log streams from AWS CloudWatch. By executing this task, organizations can effectively manage and declutter their logging space, ensuring that only relevant and necessary logs are retained. This not only optimizes the logging environment but also helps in potentially reducing storage-associated costs on AWS.

    import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_log_streams(unused_logs): """ Deletes the specified CloudWatch log streams. Args: unused_logs (list): List of dictionaries containing region, log group, and unused log stream information. Returns: list: List of dictionaries with the results of the deletion process. """ deletion_results = [] for log_info in unused_logs: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: # Delete the log stream client.delete_log_stream( logGroupName=log_info['log_group'], logStreamName=log_info['log_stream'] ) deletion_results.append({ 'status': 'success', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Successfully deleted log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}." }) except boto3.exceptions.Boto3Error as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Error deleting log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}" }) except Exception as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Unexpected error: {e}" }) return deletion_results # Main Block # unused_logs to be passed down from parent task # Example Structure, unused_logs = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity results = delete_log_streams(unused_logs) if not results: print("No log streams were deleted.") else: for result in results: print(result['message'])
    copied
    1