agent: |
KwsIZ9b1qbdxgzhF5HPCCreate s3 bucket to store athena cur query results and skip if it exists
Create s3 bucket to store athena cur query results and skip if it exists
There was a problem that the LLM was not able to address. Please rephrase your prompt and try again.
Establish an S3 bucket dedicated to storing results from Athena queries on CUR data; this process includes a check to skip creation if the bucket already exists.
inputs
outputs
import boto3
from botocore.exceptions import ClientError, BotoCoreError
creds = _get_creds(cred_label)['creds']
access_key = creds['username']
secret_key = creds['password']
def get_account_number():
"""
Fetches the AWS account number using boto3 and STS.
:return: AWS account number as a string.
"""
sts_client = boto3.client('sts',aws_access_key_id=access_key,aws_secret_access_key=secret_key,region_name="us-east-1")
try:
account_id = sts_client.get_caller_identity()["Account"]
print(f"AWS account number fetched: {account_id}")
return account_id
except ClientError as e:
print(f"Error fetching AWS account number: {e}")
return None
def bucket_exists(bucket_name):
"""
Check if an S3 bucket exists.
:param bucket_name: Name of the S3 bucket.
:return: True if the bucket exists, False otherwise.
"""
s3_client = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)
try:
s3_client.head_bucket(Bucket=bucket_name)
return True
except ClientError:
return False
def create_s3_bucket(bucket_name, region='us-east-1'):
"""
Creates an S3 bucket in a specified region, handling the 'us-east-1' special case.
:param bucket_name: Name of the S3 bucket to create.
:param region: Region to create the bucket in.
"""
s3_client = boto3.client('s3', aws_access_key_id=access_key,aws_secret_access_key=secret_key)
try:
create_bucket_params = {
'Bucket': bucket_name
}
# Add the LocationConstraint for regions other than 'us-east-1'/Account home region
if region != 'us-east-1':
create_bucket_params['CreateBucketConfiguration'] = {
'LocationConstraint': region
}
s3_client.create_bucket(**create_bucket_params)
print(f"S3 bucket '{bucket_name}' created in {region}.")
except ClientError as e:
if e.response['Error']['Code'] in ['BucketAlreadyExists', 'BucketAlreadyOwnedByYou']:
print(f"Bucket {bucket_name} already exists in {region}.")
else:
print(f"Error creating S3 bucket: {e.response['Error']['Message']}")
except BotoCoreError as e:
print(f"Unexpected error occurred while creating bucket: {e}")
account_number = get_account_number()
if account_number:
bucket_name = f'dagknows-cur-logging-bucket-athena-query-results-{account_number}'
#region_name = 'us-east-1' # Specify the AWS region
# Check if the bucket exists and create if not
if bucket_exists(bucket_name):
print(f"Bucket {bucket_name} already exists in {region_name}. Skipping creation.")
else:
create_s3_bucket(bucket_name, region_name)
else:
print("Failed to fetch account number. Exiting.")
copied