我试图实现的是将对象从一个帐户中的S3(A1-不受我控制(复制到另一个帐户(A2-由我控制(中的S3。因为来自A1的OPS为我提供了一个我可以承担的角色,使用boto3库。
session = boto3.Session()
sts_client = session.client('sts')
assumed_role = sts_client.assume_role(
RoleArn="arn:aws:iam::1234567890123:role/organization",
RoleSessionName="blahblahblah"
)
这部分还可以。问题是,从S3到S3的直接复制失败了,因为假定的角色无法访问我的S3。
s3 = boto3.resource('s3')
copy_source = {
'Bucket': a1_bucket_name,
'Key': key_name
}
bucket = s3.Bucket(a2_bucket_name)
bucket.copy(copy_source, hardcoded_key)
因此,我得到了
botocore.exceptions.ClientError: An error occurred (403) when calling the HeadObject operation: Forbidden
在这行代码中:
bucket.copy(copy_source, hardcoded_key)
有什么方法可以让我为这个角色访问S3吗?我真的很想在再次上传之前,直接从S3复制到S3,而无需在本地下载文件。
请告知是否有比这更好的方法。
例如,我们的想法是让这个脚本每天在AWS数据管道中运行。
要将对象从一个S3 bucket复制到另一个S3 bucket,您需要使用一组可以访问这两个bucket的AWS凭据。
如果这些bucket在不同的AWS帐户中,则需要两件事:
- 目标存储桶的凭据,以及
- 源存储桶上的存储桶策略,允许对目标AWS帐户进行读取访问
仅凭这些,就可以复制对象。您不需要源帐户的凭据。
- 将存储桶策略添加到源存储桶中,允许对目标AWS帐户进行读取访问
以下是一个示例策略:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DelegateS3Access",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::123456789012:root"
},
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::BUCKET_NAME",
"arn:aws:s3:::BUCKET_NAME/*"
]
}
]
}
请确保用源存储桶名称替换BUCKET_NAME
。并将123456789012
替换为您的目标AWS帐号。
- 使用目标AWS帐户(目标存储桶的所有者(的凭据执行复制
附加说明:
您还可以通过颠倒以下两个要求来复制对象:
- 源AWS帐户的凭据,以及
- 目标bucket上的bucket策略,允许写入访问源AWS帐户
但是,这样做时,对象元数据不会被正确复制。我已经与AWS支持部门讨论过这个问题,他们建议从国外账户阅读,而不是写信给国外账户,以避免这个问题。
这是一个使用boto 3在具有两个不同AWS帐户的两个S3存储桶之间传输数据的示例代码。
from boto.s3.connection import S3Connection
from boto.s3.key import Key
from Queue import LifoQueue
import threading
source_aws_key = '*******************'
source_aws_secret_key = '*******************'
dest_aws_key = '*******************'
dest_aws_secret_key = '*******************'
srcBucketName = '*******************'
dstBucketName = '*******************'
class Worker(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.source_conn = S3Connection(source_aws_key, source_aws_secret_key)
self.dest_conn = S3Connection(dest_aws_key, dest_aws_secret_key)
self.srcBucket = self.source_conn.get_bucket(srcBucketName)
self.dstBucket = self.dest_conn.get_bucket(dstBucketName)
self.queue = queue
def run(self):
while True:
key_name = self.queue.get()
k = Key(self.srcBucket, key_name)
dist_key = Key(self.dstBucket, k.key)
if not dist_key.exists() or k.etag != dist_key.etag:
print 'copy: ' + k.key
self.dstBucket.copy_key(k.key, srcBucketName, k.key, storage_class=k.storage_class)
else:
print 'exists and etag matches: ' + k.key
self.queue.task_done()
def copyBucket(maxKeys = 1000):
print 'start'
s_conn = S3Connection(source_aws_key, source_aws_secret_key)
srcBucket = s_conn.get_bucket(srcBucketName)
resultMarker = ''
q = LifoQueue(maxsize=5000)
for i in range(10):
print 'adding worker'
t = Worker(q)
t.daemon = True
t.start()
while True:
print 'fetch next 1000, backlog currently at %i' % q.qsize()
keys = srcBucket.get_all_keys(max_keys = maxKeys, marker = resultMarker)
for k in keys:
q.put(k.key)
if len(keys) < maxKeys:
print 'Done'
break
resultMarker = keys[maxKeys - 1].key
q.join()
print 'done'
if __name__ == "__main__":
copyBucket()