agent: |
HzshumZwX8irum3pnXEeCreate CloudFormation Stack for Athena CUR Setup
Create CloudFormation Stack for Athena CUR Setup
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
Develop a CloudFormation stack to automate the deployment of necessary resources for querying CUR data with Athena, ensuring all components are correctly configured and linked.
inputs
outputs
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError, ClientError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
bucket_name = BUCKET_NAME
# Parameters
#bucket_name = 'dagknows-cur-logging-bucket-athena-test-188379622596'
#report_name = 'My-CUR-report-Athena-Test-1234'
stack_name = 'MyCURReportAthenaStack'
#region_name = 'us-east-1'
def create_cloudformation_stack(stack_name, template_body, region='us-east-1'):
"""
Create a CloudFormation stack in AWS using the provided template.
:param stack_name: The name of the CloudFormation stack to be created.
:param template_body: String containing the YAML formatted CloudFormation template.
:param region: AWS region where the stack will be created. Default is 'us-east-1'.
:return: None
"""
cf_client = boto3.client('cloudformation', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)
try:
print(f"Initiating stack creation: {stack_name}")
response = cf_client.create_stack(
StackName=stack_name,
TemplateBody=template_body,
Capabilities=['CAPABILITY_NAMED_IAM'],
OnFailure='DELETE',
TimeoutInMinutes=10,
EnableTerminationProtection=False
)
print(f"Stack creation initiated successfully, stack ID: {response['StackId']}")
except cf_client.exceptions.LimitExceededException:
print("Error: You have exceeded your AWS CloudFormation limits.")
except cf_client.exceptions.AlreadyExistsException:
print("Error: A stack with the same name already exists.")
except cf_client.exceptions.TokenAlreadyExistsException:
print("Error: A client request token already exists.")
except cf_client.exceptions.InsufficientCapabilitiesException:
print("Error: Insufficient capabilities to execute this operation. Check required permissions.")
except ClientError as e:
print(f"AWS ClientError: {e.response['Error']['Message']}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
def load_template_from_s3(bucket_name, report_name):
"""
Load a CloudFormation template from an S3 bucket.
:param bucket_name: The name of the S3 bucket.
:param report_name: The base directory name of the CUR report.
:return: String of the file's contents or None if there's an error.
"""
s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
key = f"{report_name}/{report_name}/date-range/{report_name}/crawler-cfn.yml" # If there is a s3 key error then check this step for crawler.yml S3 URI
try:
response = s3_client.get_object(Bucket=bucket_name, Key=key)
print("Template fetched successfully from S3.")
return response['Body'].read().decode('utf-8')
except NoCredentialsError:
print("Error: No AWS credentials were provided.")
except PartialCredentialsError:
print("Error: Incomplete AWS credentials were provided.")
except ClientError as e:
print(f"Error accessing S3: {e.response['Error']['Message']}")
except Exception as e:
print(f"An unexpected error occurred while loading the template from S3: {e}")
return None
def stack_exists(stack_name, region):
"""
Check if a CloudFormation stack exists.
:param stack_name: The name of the CloudFormation stack.
:param region: AWS region where the stack is located.
:return: Boolean indicating if the stack exists.
"""
cf_client = boto3.client('cloudformation', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region)
try:
response = cf_client.describe_stacks(StackName=stack_name)
return len(response['Stacks']) > 0
except cf_client.exceptions.ClientError as e:
if 'does not exist' in str(e):
return False
else:
print(f"Error checking if stack exists: {e}")
return True
def main():
print("Loading the CloudFormation template from S3...")
template_body = load_template_from_s3(bucket_name, report_name)
if template_body:
print("Template loaded successfully.")
if stack_exists(stack_name, region_name):
print(f"CloudFormation stack '{stack_name}' already exists. Skipping creation.")
else:
create_cloudformation_stack(stack_name, template_body, region_name)
else:
print("Failed to load template. Exiting.")
main()
copied