agent: |
r9VELqGVT9MERUkvcBZSDelete AWS CloudWatch Log Streams
Delete AWS CloudWatch Log Streams
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
This task allows users to remove specified log streams from AWS CloudWatch. By executing this task, organizations can effectively manage and declutter their logging space, ensuring that only relevant and necessary logs are retained. This not only optimizes the logging environment but also helps in potentially reducing storage-associated costs on AWS.
inputs
outputs
import boto3
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def delete_log_streams(unused_logs):
"""
Deletes the specified CloudWatch log streams.
Args:
unused_logs (list): List of dictionaries containing region, log group, and unused log stream information.
Returns:
list: List of dictionaries with the results of the deletion process.
"""
deletion_results = []
for log_info in unused_logs:
client = boto3.client('logs', aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name=log_info['region'])
try:
# Delete the log stream
client.delete_log_stream(
logGroupName=log_info['log_group'],
logStreamName=log_info['log_stream']
)
deletion_results.append({
'status': 'success',
'region': log_info['region'],
'log_group': log_info['log_group'],
'log_stream': log_info['log_stream'],
'message': f"Successfully deleted log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}."
})
except boto3.exceptions.Boto3Error as e:
deletion_results.append({
'status': 'error',
'region': log_info['region'],
'log_group': log_info['log_group'],
'log_stream': log_info['log_stream'],
'message': f"Error deleting log stream {log_info['log_stream']} in log group {log_info['log_group']} of region {log_info['region']}: {e}"
})
except Exception as e:
deletion_results.append({
'status': 'error',
'region': log_info['region'],
'log_group': log_info['log_group'],
'log_stream': log_info['log_stream'],
'message': f"Unexpected error: {e}"
})
return deletion_results
# Main Block
# unused_logs to be passed down from parent task
# Example Structure, unused_logs = [{'region': 'us-east-1', 'log_group': '/aws/apprunner/DemoHTTP/3f3b3224524f47b693b70bd6630487a6/application', 'log_stream': 'instance/265be4ab06614e0e8a70b5acb861832e'}] # truncated for brevity
results = delete_log_streams(unused_logs)
if not results:
print("No log streams were deleted.")
else:
for result in results:
print(result['message'])
copied