Sign in
agent:

Delete unused AWS CloudWatch Log Streams

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

This runbook automates the process of identifying and deleting unused Amazon CloudWatch Log Streams. By scanning specified log groups across designated AWS regions, it efficiently detects log streams that have been inactive for a predetermined period. Once identified, these log streams are safely removed, helping organizations maintain a clutter-free logging environment and potentially reducing associated storage costs.

  1. 1

    List all AWS CloudWatch Log Streams

    There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

    This task is designed to systematically retrieve and enumerate all CloudWatch log streams present in specified AWS regions. It offers a detailed snapshot of the existing log streams, enabling users to understand their logging landscape across various AWS services and applications.

    import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def fetch_log_streams(client, region): log_streams_info = [] #print(f"\nFetching log streams for region: {region}...") log_groups = client.describe_log_groups() if not log_groups.get('logGroups'): #print(f"No log groups or streams found in region: {region}.") return log_streams_info for log_group in log_groups['logGroups']: log_group_name = log_group['logGroupName'] log_streams = client.describe_log_streams(logGroupName=log_group_name) for stream in log_streams.get('logStreams', []): #print(f"Region: {region}, Log Group: {log_group_name}, Log Stream: {stream['logStreamName']}") # Append the information to log_streams_info log_streams_info.append({ 'region': region, 'log_group': log_group_name, 'log_stream': stream['logStreamName'] }) return log_streams_info def list_all_log_streams(region=None): log_streams_info = [] # To store log streams information for all regions try: # Create an initial client to fetch regions if no specific region is provided ec2_client = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name='us-east-1') if region: regions = [region] else: regions = [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] for specific_region in regions: client = boto3.client('logs', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=specific_region) region_log_streams = fetch_log_streams(client, specific_region) log_streams_info.extend(region_log_streams) except boto3.exceptions.Boto3Error as e: print(f"An error occurred while accessing AWS: {e}") except Exception as e: print(f"An unexpected error occurred: {e}") return log_streams_info def display_log_streams(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "AWS Log Streams Data" table.num_cols = 3 # Number of columns for Region, Log Group, and Log Stream table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names headers = ["Region", "Log Group", "Log Stream"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the log stream data by region for better readability data.sort(key=lambda x: x["region"]) # Populate the table with log stream data for row_num, entry in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each log stream entry values = [entry["region"], entry["log_group"], entry["log_stream"]] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main block target_region = None # hardcoded for one time result log_streams_data = list_all_log_streams(target_region) # Pass the name of region as a string to search for that specific region otherwise it runs for all regions #print("\nCompleted fetching log streams data.") # Uncomment the line below if you want to see the returned data structure # print(log_streams_data) display_log_streams(log_streams_data)
    copied
    1
  2. 2

    Filter Unused AWS CloudWatch Log Streams

    There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

    This task examines CloudWatch log streams to identify those that have been inactive for a specified duration. By pinpointing these dormant streams, the task aids in maintaining a cleaner, more efficient logging environment and can subsequently assist in reducing unnecessary storage costs associated with retaining outdated logs on AWS CloudWatch.

    import boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def filter_unused_log_streams(all_log_streams, unused_days=30): unused_log_streams = [] for log_info in all_log_streams: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: log_stream = client.describe_log_streams( logGroupName=log_info['log_group'], logStreamNamePrefix=log_info['log_stream'] )['logStreams'][0] # We're using prefix, so getting the first result # Check if the log stream has a 'lastEventTimestamp' if 'lastEventTimestamp' in log_stream: last_event_date = datetime.utcfromtimestamp(log_stream['lastEventTimestamp'] / 1000) if last_event_date < datetime.utcnow() - timedelta(days=unused_days): unused_log_streams.append(log_info) except boto3.exceptions.Boto3Error as e: print(f"Error accessing log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}") except Exception as e: print(f"Unexpected error: {e}") return unused_log_streams def display_log_streams(data): # Initialize table with the desired structure and headers table = context.newtable() table.title = "Unused Log Streams Data" table.num_cols = 3 # Number of columns for Region, Log Group, and Log Stream table.num_rows = 1 # Starts with one row for headers table.has_header_row = True # Define header names headers = ["Region", "Log Group", "Log Stream"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Sort the log stream data by region for better readability data.sort(key=lambda x: x["region"]) # Populate the table with log stream data for row_num, entry in enumerate(data, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each log stream entry values = [entry["region"], entry["log_group"], entry["log_stream"]] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main block # UNUSED_DAYS = 90 # all_log_streams to be passed down from parent task # Example structure, all_log_streams = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity all_log_streams = log_streams_data # Passed down from parent task unused_logs = filter_unused_log_streams(all_log_streams, UNUSED_DAYS) if unused_logs: display_log_streams(unused_logs) '''print("\nFiltered unused log streams:") for log in unused_logs: print(f"Region: {log['region']}, Log Group: {log['log_group']}, Log Stream: {log['log_stream']}")''' # Uncomment the line below if you want to see the full list of unused log streams # print(unused_logs) else: print("No Unused Logs") context.skip_sub_tasks=True
    copied
    2
    1. 2.1

      Delete AWS CloudWatch Log Streams

      There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

      This task allows users to remove specified log streams from AWS CloudWatch. By executing this task, organizations can effectively manage and declutter their logging space, ensuring that only relevant and necessary logs are retained. This not only optimizes the logging environment but also helps in potentially reducing storage-associated costs on AWS.

      import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_log_streams(unused_logs): """ Deletes the specified CloudWatch log streams. Args: unused_logs (list): List of dictionaries containing region, log group, and unused log stream information. Returns: list: List of dictionaries with the results of the deletion process. """ deletion_results = [] for log_info in unused_logs: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: # Delete the log stream client.delete_log_stream( logGroupName=log_info['log_group'], logStreamName=log_info['log_stream'] ) deletion_results.append({ 'status': 'success', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Successfully deleted log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}." }) except boto3.exceptions.Boto3Error as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Error deleting log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}" }) except Exception as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Unexpected error: {e}" }) return deletion_results # Main Block # unused_logs to be passed down from parent task # Example Structure, unused_logs = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity results = delete_log_streams(unused_logs) if not results: print("No log streams were deleted.") else: for result in results: print(result['message'])
      copied
      2.1