Sign in
agent:

Create s3 bucket to store athena cur query results and skip if it exists

There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.

Establish an S3 bucket dedicated to storing results from Athena queries on CUR data; this process includes a check to skip creation if the bucket already exists.

import boto3 from botocore.exceptions import ClientError, BotoCoreError creds = _get_creds(cred_label)['creds'] access_key = creds['username'] secret_key = creds['password'] def get_account_number(): """ Fetches the AWS account number using boto3 and STS. :return: AWS account number as a string. """ sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name="us-east-1") try: account_id = sts_client.get_caller_identity()["Account"] print(f"AWS account number fetched: {account_id}") return account_id except ClientError as e: print(f"Error fetching AWS account number: {e}") return None def bucket_exists(bucket_name): """ Check if an S3 bucket exists. :param bucket_name: Name of the S3 bucket. :return: True if the bucket exists, False otherwise. """ s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key) try: s3_client.head_bucket(Bucket=bucket_name) return True except ClientError: return False def create_s3_bucket(bucket_name, region='us-east-1'): """ Creates an S3 bucket in a specified region, handling the 'us-east-1' special case. :param bucket_name: Name of the S3 bucket to create. :param region: Region to create the bucket in. """ s3_client = boto3.client('s3', aws_access_key_id=access_key,aws_secret_access_key=secret_key) try: create_bucket_params = { 'Bucket': bucket_name } # Add the LocationConstraint for regions other than 'us-east-1'/Account home region if region != 'us-east-1': create_bucket_params['CreateBucketConfiguration'] = { 'LocationConstraint': region } s3_client.create_bucket(**create_bucket_params) print(f"S3 bucket '{bucket_name}' created in {region}.") except ClientError as e: if e.response['Error']['Code'] in ['BucketAlreadyExists', 'BucketAlreadyOwnedByYou']: print(f"Bucket {bucket_name} already exists in {region}.") else: print(f"Error creating S3 bucket: {e.response['Error']['Message']}") except BotoCoreError as e: print(f"Unexpected error occurred while creating bucket: {e}") account_number = get_account_number() if account_number: bucket_name = f'dagknows-cur-logging-bucket-athena-query-results-{account_number}' #region_name = 'us-east-1' # Specify the AWS region # Check if the bucket exists and create if not if bucket_exists(bucket_name): print(f"Bucket {bucket_name} already exists in {region_name}. Skipping creation.") else: create_s3_bucket(bucket_name, region_name) else: print("Failed to fetch account number. Exiting.")
copied