Identify Idle AWS EC2 Instances

AWS EC2 instances that are running but not actively used represent unnecessary costs. An "idle" EC2 instance typically exhibits very low metrics on parameters such as CPU utilization, network input/output, and disk read/writes. By leveraging AWS CloudWatch, users can monitor these metrics and identify instances that remain underutilized based on low CPU usage over extended periods. Once identified, these instances can either be stopped or terminated, leading to more efficient resource use and cost savings. It's important to analyze and verify the activity of these instances before taking action to ensure no critical processes are inadvertently affected.

import boto3 from datetime import datetime, timedelta from botocore.exceptions import NoCredentialsError, PartialCredentialsError, BotoCoreError, ClientError, EndpointConnectionError, DataNotFoundError # Constants for CPU threshold and lookback period # CPU_THRESHOLD = 5.0 # LOOKBACK_PERIOD_HOURS = 1 creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('instances_list') is None: instances_list = [] def get_idle_instances(instances_list): idle_instances = [] end_time = datetime.utcnow() start_time = end_time - timedelta(hours=LOOKBACK_PERIOD_HOURS) for instance in instances_list: if instance['State'] != 'running': continue instance_id = instance['InstanceId'] region = instance['Region'] try: cloudwatch = boto3.client('cloudwatch',aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) cpu_stats = cloudwatch.get_metric_data( MetricDataQueries=[ { 'Id': 'cpuUtil', 'MetricStat': { 'Metric': { 'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization', 'Dimensions': [{'Name': 'InstanceId', 'Value': instance_id}] }, 'Period': 3600, # one hour periods 'Stat': 'Average' }, 'ReturnData': True } ], StartTime=start_time, EndTime=end_time ) avg_cpu_utilization = sum(cpu_stats['MetricDataResults'][0]['Values']) / len(cpu_stats['MetricDataResults'][0]['Values']) if cpu_stats['MetricDataResults'][0]['Values'] else 0.0 # Calculate idle hours based on the CPU threshold checks if avg_cpu_utilization < CPU_THRESHOLD: idle_hours = sum(1 for val in cpu_stats['MetricDataResults'][0]['Values'] if val < CPU_THRESHOLD) instance_info = instance.copy() instance_info['IdleHours'] = idle_hours idle_instances.append(instance_info) except Exception as e: print(f"Error processing instance {instance_id} in region {region}: {e}") return idle_instances def display_instance_details(data): table = context.newtable() table.title = "Idle EC2 Instances" table.num_cols = 6 # Updated number of columns table.num_rows = 1 table.has_header_row = True headers = ["Instance ID", "Instance Type", "Launch Time", "State", "Region", "Idle Hours"] for col_num, header in enumerate(headers): table.setval(0, col_num, header) data.sort(key=lambda x: x["LaunchTime"], reverse=True) for row_num, instance in enumerate(data, start=1): table.num_rows += 1 values = [ instance["InstanceId"], instance["InstanceType"], instance["LaunchTime"].strftime('%Y-%m-%d %H:%M:%S'), instance["State"], instance["Region"], str(instance["IdleHours"]) # Ensure the idle hours are converted to string ] for col_num, value in enumerate(values): table.setval(row_num, col_num, value) # Main execution # Ensure to include your list_all_ec2_instances function or import it if it's in another module # instances_list = list_all_ec2_instances() Already taken from parent task idle_instances_list = get_idle_instances(instances_list) # Printing the details of idle instances if idle_instances_list: ''' print("\nIdle EC2 Instances:") for instance in idle_instances_list: print("-" * 60) # Separator line for key, value in instance.items(): print(f"{key}: {value}")''' display_instance_details(idle_instances_list) else: print("No idle instances found.") # Create a new list with only 'InstanceId' and 'Region' for each instance filtered_instances = [{'InstanceId': instance['InstanceId'], 'Region': instance['Region']} for instance in idle_instances_list] context.skip_sub_tasks=True ''' # Print the new list print("Printing instance_id and region wise instance list to check values for passing down to downstream task") for instance in filtered_instances: print(instance) '''
    Stop an AWS EC2 Instance

    In AWS, an EC2 instance can be in various states, including running, stopped, or terminated. Stopping an EC2 instance essentially means shutting it down, similar to turning off a computer. When an instance is stopped, it is not running, and therefore, you are not billed for instance usage. However, you are still billed for any EBS storage associated with the instance. The advantage of stopping, instead of terminating, is that you can start the instance again at any time. This capability is useful for scenarios where you want to temporarily halt operations without losing the instance configuration or data. It's essential to understand that stopping an instance will lead to the loss of the ephemeral storage content (Instance Store), but data on EBS volumes will remain intact.

    import boto3 from botocore.exceptions import ClientError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] if locals().get('filtered_instances') is None: filtered_instances = [] def stop_ec2_instances(instances_to_stop): # To keep track of instances successfully stopped stopped_instances = [] # To keep track of instances that failed to stop failed_instances = [] # To keep track of instances that were already stopped or in the process of stopping already_stopped_instances = [] # Iterate over each instance in the list for instance_info in instances_to_stop: instance_id = instance_info['InstanceId'] region = instance_info['Region'] # Initialize the EC2 client for the specific region ec2_client = boto3.client('ec2', aws_access_key_id=access_key,aws_secret_access_key=secret_key, region_name=region) # Fetch the current state of the instance try: response = ec2_client.describe_instances(InstanceIds=[instance_id]) instance_state = response['Reservations'][0]['Instances'][0]['State']['Name'] if instance_state in ["stopped", "stopping"]: already_stopped_instances.append(instance_id) print(f"Instance {instance_id} in region {region} is already in '{instance_state}' state.") continue # If the instance is not already stopped or stopping, then attempt to stop it ec2_client.stop_instances(InstanceIds=[instance_id]) stopped_instances.append(instance_id) print(f"Instance {instance_id} in region {region} has been stopped.") except ClientError as e: failed_instances.append(instance_id) print(f"Error with instance {instance_id} in region {region}: {e}") # Print a summary of the actions print("\nSummary:\n") if stopped_instances: print(f"Successfully stopped {len(stopped_instances)} instances: {', '.join(stopped_instances)}") if already_stopped_instances: print(f"{len(already_stopped_instances)} instances were already stopped or stopping: {', '.join(already_stopped_instances)}") if failed_instances: print(f"Failed to stop {len(failed_instances)} instances: {', '.join(failed_instances)}") ''' # Sample list of instances to stop taken from previous task or provide these instances to use the task in a standalone manner instances_to_stop = [ {'InstanceId': 'i-01615251421b8b5da', 'Region': 'us-east-1'}, {'InstanceId': 'i-057155192c87ea310', 'Region': 'us-east-1'} # ... (other instances) ] ''' stop_ec2_instances(filtered_instances) # passed down from previous task otherwise pass instances_to_stop to function to use the task in a standalone manner.