Sign in
agent:

Delete AWS CloudWatch Log Streams

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

This task allows users to remove specified log streams from AWS CloudWatch. By executing this task, organizations can effectively manage and declutter their logging space, ensuring that only relevant and necessary logs are retained. This not only optimizes the logging environment but also helps in potentially reducing storage-associated costs on AWS.

import boto3 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def delete_log_streams(unused_logs): """ Deletes the specified CloudWatch log streams. Args: unused_logs (list): List of dictionaries containing region, log group, and unused log stream information. Returns: list: List of dictionaries with the results of the deletion process. """ deletion_results = [] for log_info in unused_logs: client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region']) try: # Delete the log stream client.delete_log_stream( logGroupName=log_info['log_group'], logStreamName=log_info['log_stream'] ) deletion_results.append({ 'status': 'success', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Successfully deleted log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}." }) except boto3.exceptions.Boto3Error as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Error deleting log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}" }) except Exception as e: deletion_results.append({ 'status': 'error', 'region': log_info['region'], 'log_group': log_info['log_group'], 'log_stream': log_info['log_stream'], 'message': f"Unexpected error: {e}" }) return deletion_results # Main Block # unused_logs to be passed down from parent task # Example Structure, unused_logs = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity results = delete_log_streams(unused_logs) if not results: print("No log streams were deleted.") else: for result in results: print(result['message'])
copied