我有'写入'一个功能,可以将对象从S3存储桶上的一个位置复制到同一存储桶内的另一个位置。(实际上,它主要是从S3CMD升起的代码(。
import sys
import time
from copy import copy
import logging
from S3.Exceptions import *
from S3.S3 import S3
from S3.Config import Config
from S3.FileDict import FileDict
from S3.S3Uri import S3Uri
from S3.Utils import *
from S3.FileLists import *
from S3.ExitCodes import EX_OK
LOG = logging.getLogger()
# Unfortunately the s3cmd implementation uses a global instance of config object
# everywhere
cfg = None
def init_s3_cfg(access_key, secret_key):
global cfg
cfg = Config(access_key=access_key, secret_key=secret_key)
def cmd_sync_remote2remote(str_from_path, str_destination_base):
'''
This function is adopted from s3cmd project https://github.com/s3tools/s3cmd
because boto does not support recursive copy out of the box
:param str_from_path:
:param str_destination_base:
:return:
'''
s3 = S3(cfg)
LOG.info(s3.config.bucket_location)
# Normalise s3://uri (e.g. assert trailing slash)
from_path = S3Uri(str_from_path).uri()
destination_base = S3Uri(str_destination_base).uri()
LOG.info("from %s to %s" % (from_path, destination_base))
src_list, src_exclude_list = fetch_remote_list(s3, from_path,
recursive=True, require_attribs=True)
dst_list, dst_exclude_list = fetch_remote_list(s3, destination_base,
recursive=True, require_attribs=True)
src_count = len(src_list)
dst_count = len(dst_list)
LOG.info(u"Found %d source files, %d destination files" %
(src_count, dst_count))
src_list, dst_list, update_list, copy_pairs = compare_filelists(src_list,
dst_list, src_remote=True, dst_remote=True)
src_count = len(src_list)
update_count = len(update_list)
dst_count = len(dst_list)
LOG.info(u"Summary: %d source files to copy, %d files at destination to delete"
% (src_count, dst_count))
# Populate 'target_uri' only if we've got something to sync from src to dst
for key in src_list:
src_list[key]['target_uri'] = destination_base + key
for key in update_list:
update_list[key]['target_uri'] = destination_base + key
def _upload(src_list, seq, src_count):
file_list = src_list.keys()
file_list.sort()
for file in file_list:
seq += 1
item = src_list[file]
src_uri = S3Uri(item['object_uri_str'])
dst_uri = S3Uri(item['target_uri'])
extra_headers = copy(cfg.extra_headers)
try:
_response = s3.object_copy(src_uri, dst_uri, extra_headers)
LOG.info("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
except S3Error, e:
LOG.error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
return seq
# Perform the synchronization of files
timestamp_start = time.time()
seq = 0
seq = _upload(src_list, seq, src_count + update_count)
seq = _upload(update_list, seq, src_count + update_count)
n_copied, bytes_saved, failed_copy_files = remote_copy(s3, copy_pairs, destination_base)
# Process files not copied
debug("Process files that was not remote copied")
failed_copy_count = len (failed_copy_files)
for key in failed_copy_files:
failed_copy_files[key]['target_uri'] = destination_base + key
seq = _upload(failed_copy_files, seq, src_count + update_count + failed_copy_count)
total_elapsed = max(1.0, time.time() - timestamp_start)
outstr = "Done. Copied %d files in %0.1f seconds, %0.2f files/s" % (seq, total_elapsed, seq/total_elapsed)
LOG.info(outstr)
return EX_OK
def remote_copy(s3, copy_pairs, destination_base):
saved_bytes = 0
failed_copy_list = FileDict()
for (src_obj, dst1, dst2) in copy_pairs:
LOG.debug(u"Remote Copying from %s to %s" % (dst1, dst2))
dst1_uri = S3Uri(destination_base + dst1)
dst2_uri = S3Uri(destination_base + dst2)
extra_headers = copy(cfg.extra_headers)
try:
s3.object_copy(dst1_uri, dst2_uri, extra_headers)
info = s3.object_info(dst2_uri)
saved_bytes = saved_bytes + long(info['headers']['content-length'])
LOG.info(u"remote copy: %s -> %s" % (dst1, dst2))
except:
LOG.warning(u'Unable to remote copy files %s -> %s' % (dst1_uri, dst2_uri))
failed_copy_list[dst2] = src_obj
return (len(copy_pairs), saved_bytes, failed_copy_list)
如果S3键具有所有S3权限,则可以正常工作。但是,我想使用一部分IAM,并以一定的权限来调用此功能。这是我当前的小组政策:
{
"Statement": [
{
"Sid": "cloneFiles",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:PutObjectAcl",
"s3:DeleteObject"
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::target-bucket/*"
]
}
]
}
使用此新策略,我收到了此错误消息:
ERROR:root:S3 error: Access Denied
我想知道:
1(是否有一种简单的方法可以解决哪些许可(例如某个参数,env var(?S3可以报告需要哪个许可?如果是这样,我该如何找出答案?
2(任何人都可以通过阅读代码或其他方式确定丢失的权限?
在回答#2时,您可能会在fetch_remote_list((呼叫上绊倒。使用s3:ListBucket
。值得注意的是,ListBucket适用于存储桶资源,而不是存储桶中的对象路径(一个微妙但重要的区别(。因此,您需要第二个语句,例如
{
"Action": [
"s3:ListBucket",
],
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::target-bucket"
]
}