Manage Unused AWS CloudWatch Dashboards

This runbook involves identifying and addressing dashboards within AWS CloudWatch that have not been accessed or utilized for a predetermined period. It typically includes analyzing dashboard activity through AWS CloudTrail to detect inactivity, categorizing dashboards as unused, and then taking appropriate actions such as deletion or archiving to optimize resource management and reduce clutter and unnecessary costs.

  1.

    List All AWS CloudWatch Dashboards

    This task involves retrieving and displaying a comprehensive overview of all AWS CloudWatch dashboards available across different AWS regions. This includes details such as the dashboard names, the regions they are hosted in, and the last modified dates. The goal is to provide visibility into all existing dashboards to manage them effectively or audit their usage.

    import boto3 from datetime import datetime, timedelta # AWS credentials creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_regions_for_service(): """Retrieve all regions where a specific AWS service is available.""" ec2 = boto3.client('ec2', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name="us-east-1") regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']] return regions def get_cloudwatch_client(region_name=None): """Create a CloudWatch client for the specified region.""" return boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region_name) def list_cloudwatch_dashboards(region_name): """List all CloudWatch dashboards in a specific region.""" cloudwatch_client = get_cloudwatch_client(region_name) dashboards = [] paginator = cloudwatch_client.get_paginator('list_dashboards') try: for page in paginator.paginate(): for dashboard in page['DashboardEntries']: dashboard['Region'] = region_name # Add region information to each dashboard # Convert LastModified datetime to string if present if 'LastModified' in dashboard: dashboard['LastModified'] = dashboard['LastModified'].strftime('%Y-%m-%d %H:%M:%S') dashboards.append(dashboard) except Exception as e: print(f"An error occurred in {region_name}: {e}") return [] return dashboards def display_dashboards(dashboards): """Display all CloudWatch dashboards in a formatted table.""" # Initialize table with the desired structure and headers table = context.newtable() table.title = "AWS CloudWatch Dashboard Details" table.num_cols = 3 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True headers = ["Dashboard Name", "Region", "Last Modified"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Populate the table with dashboard data for row_num, dashboard in enumerate(dashboards, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each dashboard values = [ dashboard['DashboardName'], dashboard['Region'], dashboard.get('LastModified', 'N/A') # Not all responses may have 'LastModified' ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) def process_dashboards(region_name=None): if region_name: regions = [region_name] # Process only the specified region else: regions = get_regions_for_service() # Process all available regions all_dashboards = [] for region in regions: dashboards = list_cloudwatch_dashboards(region) # Fetch dashboards for each region if dashboards: all_dashboards.extend(dashboards) # Extend the list with dashboards from the current region if all_dashboards: display_dashboards(all_dashboards) # Display all found dashboards in a formatted table else: print("No CloudWatch dashboards found.") return all_dashboards # Return the list of all dashboards for use in downstream tasks # Example usage #region_name = None # Set to None for all regions, or specify like 'us-east-1' all_dashboards = process_dashboards(region_name) #print(all_dashboards)
  2.

    Filter Out Unused AWS CloudWatch Dashboards

    This task involves identifying dashboards with minimal interaction or updates over a set period. This process typically uses AWS CloudTrail to track access patterns, helping organizations identify and decommission underutilized dashboards. This optimization reduces costs and administrative effort by eliminating unnecessary monitoring tools.

    import boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_cloudtrail_client(region_name): """Create a CloudTrail client for the specified region.""" return boto3.client('cloudtrail', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region_name) def check_dashboard_activity(dashboard, cloudtrail_client, last_accessed_days_threshold): """Check if there has been recent activity on a dashboard using CloudTrail.""" start_time = - timedelta(days=last_accessed_days_threshold) try: events = cloudtrail_client.lookup_events( LookupAttributes=[ { 'AttributeKey': 'ResourceName', 'AttributeValue': dashboard['DashboardArn'] } ], StartTime=start_time, ) return len(events['Events']) == 0 except Exception as e: print(f"Error checking activity for dashboard {dashboard['DashboardName']} in {dashboard['Region']}: {e}") return True # Assume unused if error occurs def filter_unused_dashboards(dashboards, last_accessed_days_threshold=90): """Filter dashboards to find unused ones based on activity and add days_old information.""" unused_dashboards = [] current_date = for dashboard in dashboards: cloudtrail_client = get_cloudtrail_client(dashboard['Region']) if check_dashboard_activity(dashboard, cloudtrail_client, last_accessed_days_threshold): last_modified = datetime.strptime(dashboard['LastModified'], '%Y-%m-%d %H:%M:%S') days_old = (current_date - last_modified).days dashboard['DaysOld'] = days_old # Adding days_old to the dashboard dictionary unused_dashboards.append(dashboard) return unused_dashboards def display_unused_dashboards(unused_dashboards): """Display unused dashboards in a formatted table with an additional 'Days Old' column.""" # Initialize table with the desired structure and headers table = context.newtable() table.title = "AWS CloudWatch Dashboard Details - Unused Dashboards" table.num_cols = 4 # Number of columns according to headers table.num_rows = 1 # Starts with one row for headers table.has_header_row = True headers = ["Dashboard Name", "Region", "Last Modified", "Days Old"] # Set headers in the first row for col_num, header in enumerate(headers): table.setval(0, col_num, header) # Populate the table with dashboard data for row_num, dashboard in enumerate(unused_dashboards, start=1): # Starting from the second row table.num_rows += 1 # Add a row for each dashboard values = [ dashboard['DashboardName'], dashboard['Region'], dashboard.get('LastModified', 'N/A'), # Use 'N/A' if 'LastModified' isn't available str(dashboard.get('DaysOld', 'N/A')) # Convert days old to string; use 'N/A' if not available ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Assuming all_dashboards is passed from upstream task #last_accessed_days_threshold = 90 # Example usage unused_dashboards = filter_unused_dashboards(all_dashboards, last_accessed_days_threshold) display_unused_dashboards(unused_dashboards) context.skip_sub_tasks = True
    2.1

      Delete AWS CloudWatch Dashboard

      This task involves removing specific AWS CloudWatch dashboards that are no longer needed or in use. This helps streamline monitoring resources and reduce clutter, ensuring that only relevant and actively used dashboards remain operational within AWS environments.

      import boto3 from datetime import datetime, timedelta creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_cloudwatch_client(region_name): """Create a CloudWatch client for the specified region.""" return boto3.client('cloudwatch', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region_name) def delete_dashboard(dashboard, cloudwatch_client): """Delete a specific CloudWatch dashboard.""" try: cloudwatch_client.delete_dashboards(DashboardNames=[dashboard['DashboardName']]) return True except Exception as e: print(f"Error deleting dashboard {dashboard['DashboardName']} in {dashboard['Region']}: {e}") return False def process_and_delete_dashboards(unused_dashboards): """Process and delete unused dashboards while displaying relevant details.""" print(f"{'Dashboard Name':<30} {'Region':<15} {'Days Old':<10} {'ARN':<60} {'Deleted':<10}") for dashboard in unused_dashboards: cloudwatch_client = get_cloudwatch_client(dashboard['Region']) deletion_success = delete_dashboard(dashboard, cloudwatch_client) deletion_status = 'Success' if deletion_success else 'Failed' print(f"{dashboard['DashboardName']:<30} {dashboard['Region']:<15} {dashboard['DaysOld']:<10} {dashboard['DashboardArn']:<60} {deletion_status:<10}") # Example usage process_and_delete_dashboards(unused_dashboards)