OK6np9mI65cVmADIdvR6Filter Unused AWS CloudWatch Log Streams
Filter Unused AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
This task examines CloudWatch log streams to identify those that have been inactive for a specified duration. By pinpointing these dormant streams, the task aids in maintaining a cleaner, more efficient logging environment and can subsequently assist in reducing unnecessary storage costs associated with retaining outdated logs on AWS CloudWatch.
inputs
outputs
import boto3
from datetime import datetime, timedelta
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def filter_unused_log_streams(all_log_streams, unused_days=30):
unused_log_streams = []
for log_info in all_log_streams:
client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region'])
try:
log_stream = client.describe_log_streams(
logGroupName=log_info['log_group'],
logStreamNamePrefix=log_info['log_stream']
)['logStreams'][0] # We're using prefix, so getting the first result
# Check if the log stream has a 'lastEventTimestamp'
if 'lastEventTimestamp' in log_stream:
last_event_date = datetime.utcfromtimestamp(log_stream['lastEventTimestamp'] / 1000)
if last_event_date < datetime.utcnow() - timedelta(days=unused_days):
unused_log_streams.append(log_info)
except boto3.exceptions.Boto3Error as e:
print(f"Error accessing log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
return unused_log_streams
def display_log_streams(data):
# Initialize table with the desired structure and headers
table = context.newtable()
table.title = "Unused Log Streams Data"
table.num_cols = 3 # Number of columns for Region, Log Group, and Log Stream
table.num_rows = 1 # Starts with one row for headers
table.has_header_row = True
# Define header names
headers = ["Region", "Log Group", "Log Stream"]
# Set headers in the first row
for col_num, header in enumerate(headers):
table.setval(0, col_num, header)
# Sort the log stream data by region for better readability
data.sort(key=lambda x: x["region"])
# Populate the table with log stream data
for row_num, entry in enumerate(data, start=1): # Starting from the second row
table.num_rows += 1 # Add a row for each log stream entry
values = [entry["region"], entry["log_group"], entry["log_stream"]]
for col_num, value in enumerate(values):
table.setval(row_num, col_num, value)
# Main block
# UNUSED_DAYS = 90
# all_log_streams to be passed down from parent task
# Example structure, all_log_streams = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity
all_log_streams = log_streams_data # Passed down from parent task
unused_logs = filter_unused_log_streams(all_log_streams, UNUSED_DAYS)
if unused_logs:
display_log_streams(unused_logs)
'''print("\nFiltered unused log streams:")
for log in unused_logs:
print(f"Region: {log['region']}, Log Group: {log['log_group']}, Log Stream: {log['log_stream']}")'''
# Uncomment the line below if you want to see the full list of unused log streams
# print(unused_logs)
else:
print("No Unused Logs")
context.skip_sub_tasks=True
copied
- 1r9VELqGVT9MERUkvcBZSDelete AWS CloudWatch Log Streams
1
Delete AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.This task allows users to remove specified log streams from AWS CloudWatch. By executing this task, organizations can effectively manage and declutter their logging space, ensuring that only relevant and necessary logs are retained. This not only optimizes the logging environment but also helps in potentially reducing storage-associated costs on AWS.
inputsoutputsimport boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_log_streams(unused_logs): """ Deletes the specified CloudWatch log streams. Args: unused_logs (list): List of dictionaries containing region, log group, and unused log stream information. Returns: list: List of dictionaries with the results of the deletion process. """ deletion_results = [] for log_info in unused_logs: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: # Delete the log stream client.delete_log_stream( logGroupName=log_info['log_group'], logStreamName=log_info['log_stream'] ) deletion_results.append({ 'status': 'success', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Successfully deleted log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}." }) except boto3.exceptions.Boto3Error as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Error deleting log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}" }) except Exception as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Unexpected error: {e}" }) return deletion_results # Main Block # unused_logs to be passed down from parent task # Example Structure, unused_logs = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity results = delete_log_streams(unused_logs) if not results: print("No log streams were deleted.") else: for result in results: print(result['message'])copied1