使用boto3将本地文件夹同步到S3桶



我注意到boto3中没有API用于"同步"操作,您可以通过命令行执行。

so,

如何使用boto3将本地文件夹同步到给定的存储桶?

我刚刚为此实现了一个简单的类。我在这里发布它,希望它能帮助任何人都有相同问题的人。

您可以修改s3sync.sync,以考虑文件大小。

class S3Sync:
    """
    Class that holds the operations needed for synchronize local dirs to a given bucket.
    """
    def __init__(self):
        self._s3 = boto3.client('s3')
    def sync(self, source: str, dest: str) -> [str]:
        """
        Sync source to dest, this means that all elements existing in
        source that not exists in dest will be copied to dest.
        No element will be deleted.
        :param source: Source folder.
        :param dest: Destination folder.
        :return: None
        """
        paths = self.list_source_objects(source_folder=source)
        objects = self.list_bucket_objects(dest)
        # Getting the keys and ordering to perform binary search
        # each time we want to check if any paths is already there.
        object_keys = [obj['Key'] for obj in objects]
        object_keys.sort()
        object_keys_length = len(object_keys)
        
        for path in paths:
            # Binary search.
            index = bisect_left(object_keys, path)
            if index == object_keys_length:
                # If path not found in object_keys, it has to be sync-ed.
                self._s3.upload_file(str(Path(source).joinpath(path)),  Bucket=dest, Key=path)
    def list_bucket_objects(self, bucket: str) -> [dict]:
        """
        List all objects for the given bucket.
        :param bucket: Bucket name.
        :return: A [dict] containing the elements in the bucket.
        Example of a single object.
        {
            'Key': 'example/example.txt',
            'LastModified': datetime.datetime(2019, 7, 4, 13, 50, 34, 893000, tzinfo=tzutc()),
            'ETag': '"b11564415be7f58435013b414a59ae5c"',
            'Size': 115280,
            'StorageClass': 'STANDARD',
            'Owner': {
                'DisplayName': 'webfile',
                'ID': '75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a'
            }
        }
        """
        try:
            contents = self._s3.list_objects(Bucket=bucket)['Contents']
        except KeyError:
            # No Contents Key, empty bucket.
            return []
        else:
            return contents
    @staticmethod
    def list_source_objects(source_folder: str) -> [str]:
        """
        :param source_folder:  Root folder for resources you want to list.
        :return: A [str] containing relative names of the files.
        Example:
            /tmp
                - example
                    - file_1.txt
                    - some_folder
                        - file_2.txt
            >>> sync.list_source_objects("/tmp/example")
            ['file_1.txt', 'some_folder/file_2.txt']
        """
        path = Path(source_folder)
        paths = []
        for file_path in path.rglob("*"):
            if file_path.is_dir():
                continue
            str_file_path = str(file_path)
            str_file_path = str_file_path.replace(f'{str(path)}/', "")
            paths.append(str_file_path)
        return paths

if __name__ == '__main__':
    sync = S3Sync()
    sync.sync("/temp/some_folder", "some_bucket_name")

更新:

@z.wei评论:

挖掘这一点以处理怪异的一分子函数。我们可能只需在object_keys中使用路径:?

我认为是一个有趣的问题,值得回答更新而不会在评论中迷失。

答案:

不,if path not in object_keys将执行线性搜索 O(n(。Bisect_*执行二进制搜索(必须订购列表(,即O(log(n((。

大多数时候,您将处理足够的对象来进行排序和二进制搜索通常比仅使用关键字更快。

介绍您必须检查源中的每条路径,以使用in o(m * n(的目的地中的每个路径,其中m是该对象的数量目的地中的来源和n。使用Bisect整个过程是 o(n * log(n((

但是...

如果我考虑的话,您可以使用集合使该算法更快(简单,因此更简单(:

def sync(self, source: str, dest: str) -> [str]:
    # Local paths
    paths = set(self.list_source_objects(source_folder=source))
    # Getting the keys (remote s3 paths).
    objects = self.list_bucket_objects(dest)
    object_keys = set([obj['Key'] for obj in objects])
    # Compute the set difference: What we have in paths that does
    # not exists in object_keys.
    to_sync = paths - object_keys
    sournce_path = Path(source)
    for path in to_sync:
        self._s3.upload_file(str(sournce_path / path),
                                Bucket=dest, Key=path)

sets中的搜索是o(1(大多数时候(https://wiki.python.org/moin/timecomplexity(因此,使用集合将是 o(n(的速度,速度比以前的 o(m * log(m * log(n(((

进一步改进

可以将代码改进更多的方法list_bucket_objectslist_source_objects以返回集而不是列表。

最新更新