列出bucket中的所有对象,并使用boto3将所有对象的通知发送到AWS SQS



我想为s3存储桶中的每个对象或键生成一条消息。我确实有十把钥匙。这就是为什么我想用list_objects_v2列出所有它们,然后将它们传递给SQS队列。下面是我尝试使用的代码示例:

import json
import boto3
region = "us-east-2"
bucket = "s3-small-files-fiap"
prefix = 'folder/'
s3_client = boto3.client('s3', region_name=region)
response = s3_client.list_objects_v2(Bucket=bucket,
Prefix=prefix)
settings = {
"bucket_name": "s3-small-files-fiap",
"queue_name": "sqs-csv-to-json",
"region": region,
"account_number": <my_account_number>
}
bucket_notifications_configuration = {
'QueueConfigurations': [{
'Events': ['s3:ObjectCreated:*'],
'Id': 'Notifications',
'QueueArn':
'arn:aws:sqs:{region}:{account_number}:{queue_name}'.format(**settings)
}]
}
qpolicy = {
"Version": "2012-10-17",
"Id":
"arn:aws:sqs:{region}:{account_number}:{queue_name}/SQSDefaultPolicy".format(
**settings),
"Statement": [{
"Sid": "allow tmp bucket to notify",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SQS:SendMessage",
"Resource": "arn:aws:sqs:{region}:{account_number}:{queue_name}".format(
**settings),
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:*:*:{bucket_name}".format(
**settings)
}
}
}]
}
print("Bucket notify", bucket_notifications_configuration)
print("Queue Policy", qpolicy)
queue_attrs = {"Policy": json.dumps(qpolicy), }
sqs_client = boto3.resource("sqs",
region_name=region).get_queue_by_name(
QueueName=settings["queue_name"])
sqs_client.set_attributes(Attributes=queue_attrs)
sqs_client.attributes
s3_client.put_bucket_notification_configuration(
Bucket=bucket,
NotificationConfiguration=bucket_notifications_configuration)

出于某种原因,它的输出只生成一条通知消息,如下所示。如何使用上面的代码发送十次通知而不是一次?

以下是输出示例:

Bucket notify {'QueueConfigurations': [{'Events': ['s3:ObjectCreated:*'], 'Id': 'Notifications', 'QueueArn': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json'}]}
Queue Policy {'Version': '2012-10-17', 'Id': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json/SQSDefaultPolicy', 'Statement': [{'Sid': 'allow tmp bucket to notify', 'Effect': 'Allow', 'Principal': {'AWS': '*'}, 'Action': 'SQS:SendMessage', 'Resource': 'arn:aws:sqs:us-east-2:<my_account_number>:sqs-csv-to-json', 'Condition': {'ArnLike': {'aws:SourceArn': 'arn:aws:s3:*:*:s3-small-files-fiap'}}}]}

创建一个方法并移动您的SNS发送代码

QUEUE_NAME = os.getenv("QUEUE_NAME")
SQS = boto3.client("sqs")
## Inside handler method 
s3_resource = boto3.resource('s3')
response = s3_client.list_objects_v2(Bucket=bucket,Prefix=prefix)

for file in response:
# call SQS send method here 
try:
#logger.debug("Recording %s", file)
u = getQueueURL()
logging.debug("Got queue URL %s", u)
resp = SQS.send_message(QueueUrl=u, MessageBody=file)
#logger.debug("Send result: %s", resp)
except Exception as e:
raise Exception("Raised Exception! %s" % e)
def getQueueURL():
"""Retrieve the URL for the configured queue name"""
q = SQS.get_queue_url(QueueName=QUEUE_NAME).get('QueueUrl')
#logger.debug("Queue URL is %s", QUEUE_URL)
return q

最新更新