如何找出我需要运行此功能的所有必要的IAM许可



我有'写入'一个功能,可以将对象从S3存储桶上的一个位置复制到同一存储桶内的另一个位置。(实际上,它主要是从S3CMD升起的代码(。

import sys
import time
from copy import copy
import logging
from S3.Exceptions import *
from S3.S3 import S3
from S3.Config import Config
from S3.FileDict import FileDict
from S3.S3Uri import S3Uri
from S3.Utils import *
from S3.FileLists import *
from S3.ExitCodes import EX_OK

LOG = logging.getLogger()

# Unfortunately the s3cmd implementation uses a global instance of config object
# everywhere
cfg = None

def init_s3_cfg(access_key, secret_key):
    global cfg
    cfg = Config(access_key=access_key, secret_key=secret_key)
def cmd_sync_remote2remote(str_from_path, str_destination_base):
    '''
    This function is adopted from s3cmd project https://github.com/s3tools/s3cmd
     because boto does not support recursive copy out of the box
    :param str_from_path:
    :param str_destination_base:
    :return:
    '''
    s3 = S3(cfg)
    LOG.info(s3.config.bucket_location)
    # Normalise s3://uri (e.g. assert trailing slash)
    from_path = S3Uri(str_from_path).uri()
    destination_base = S3Uri(str_destination_base).uri()
    LOG.info("from %s to %s" % (from_path, destination_base))
    src_list, src_exclude_list = fetch_remote_list(s3, from_path,
                                    recursive=True, require_attribs=True)
    dst_list, dst_exclude_list = fetch_remote_list(s3, destination_base,
                                    recursive=True, require_attribs=True)
    src_count = len(src_list)
    dst_count = len(dst_list)
    LOG.info(u"Found %d source files, %d destination files" %
             (src_count, dst_count))
    src_list, dst_list, update_list, copy_pairs = compare_filelists(src_list,
                                dst_list, src_remote=True, dst_remote=True)
    src_count = len(src_list)
    update_count = len(update_list)
    dst_count = len(dst_list)
    LOG.info(u"Summary: %d source files to copy, %d files at destination to delete"
             % (src_count, dst_count))
    # Populate 'target_uri' only if we've got something to sync from src to dst
    for key in src_list:
        src_list[key]['target_uri'] = destination_base + key
    for key in update_list:
        update_list[key]['target_uri'] = destination_base + key
    def _upload(src_list, seq, src_count):
        file_list = src_list.keys()
        file_list.sort()
        for file in file_list:
            seq += 1
            item = src_list[file]
            src_uri = S3Uri(item['object_uri_str'])
            dst_uri = S3Uri(item['target_uri'])
            extra_headers = copy(cfg.extra_headers)
            try:
                _response = s3.object_copy(src_uri, dst_uri, extra_headers)
                LOG.info("File %(src)s copied to %(dst)s" % { "src" : src_uri, "dst" : dst_uri })
            except S3Error, e:
                LOG.error("File %(src)s could not be copied: %(e)s" % { "src" : src_uri, "e" : e })
        return seq
    # Perform the synchronization of files
    timestamp_start = time.time()
    seq = 0
    seq = _upload(src_list, seq, src_count + update_count)
    seq = _upload(update_list, seq, src_count + update_count)
    n_copied, bytes_saved, failed_copy_files = remote_copy(s3, copy_pairs, destination_base)
    # Process files not copied
    debug("Process files that was not remote copied")
    failed_copy_count = len (failed_copy_files)
    for key in failed_copy_files:
        failed_copy_files[key]['target_uri'] = destination_base + key
    seq = _upload(failed_copy_files, seq, src_count + update_count + failed_copy_count)
    total_elapsed = max(1.0, time.time() - timestamp_start)
    outstr = "Done. Copied %d files in %0.1f seconds, %0.2f files/s" % (seq, total_elapsed, seq/total_elapsed)
    LOG.info(outstr)
    return EX_OK
def remote_copy(s3, copy_pairs, destination_base):
    saved_bytes = 0
    failed_copy_list = FileDict()
    for (src_obj, dst1, dst2) in copy_pairs:
        LOG.debug(u"Remote Copying from %s to %s" % (dst1, dst2))
        dst1_uri = S3Uri(destination_base + dst1)
        dst2_uri = S3Uri(destination_base + dst2)
        extra_headers = copy(cfg.extra_headers)
        try:
            s3.object_copy(dst1_uri, dst2_uri, extra_headers)
            info = s3.object_info(dst2_uri)
            saved_bytes = saved_bytes + long(info['headers']['content-length'])
            LOG.info(u"remote copy: %s -> %s" % (dst1, dst2))
        except:
            LOG.warning(u'Unable to remote copy files %s -> %s' % (dst1_uri, dst2_uri))
            failed_copy_list[dst2] = src_obj
    return (len(copy_pairs), saved_bytes, failed_copy_list)

如果S3键具有所有S3权限,则可以正常工作。但是,我想使用一部分IAM,并以一定的权限来调用此功能。这是我当前的小组政策:

{ 
    "Statement": [
        {
            "Sid": "cloneFiles",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:PutObjectAcl",
                "s3:DeleteObject"
            ],
            "Effect": "Allow",
            "Resource": [
                "arn:aws:s3:::target-bucket/*"
            ]
        }
    ]
}

使用此新策略,我收到了此错误消息:

ERROR:root:S3 error: Access Denied

我想知道:

1(是否有一种简单的方法可以解决哪些许可(例如某个参数,env var(?S3可以报告需要哪个许可?如果是这样,我该如何找出答案?

2(任何人都可以通过阅读代码或其他方式确定丢失的权限?

在回答#2时,您可能会在fetch_remote_list((呼叫上绊倒。使用s3:ListBucket。值得注意的是,ListBucket适用于存储桶资源,而不是存储桶中的对象路径(一个微妙但重要的区别(。因此,您需要第二个语句,例如

        {
        "Action": [
            "s3:ListBucket",
        ],
        "Effect": "Allow",
        "Resource": [
            "arn:aws:s3:::target-bucket"
        ]
    }

相关内容

  • 没有找到相关文章

最新更新