GCS - 具有目录结构的 Python 下载 blob



我正在使用GCS python SDK和Google API客户端的组合来循环访问启用了版本的存储桶并根据元数据下载特定对象。

from google.cloud import storage
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
def downloadepoch_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()
    for item in response['items']:
        if item['metadata']['epoch'] == restore_epoch:
            print(item['bucket'])
            print(item['name'])
            print(item['metadata']['epoch'])
            print(item['updated'])
            blob = source_bucket.blob(item['name'])
            blob.download_to_filename(
                '/Users/admin/git/data-processing/{}'.format(item))

downloadepoch_objects()

上述函数适用于不在目录 (gs://bucketname/test1.txt( 中的 blob,因为传入的项只是 test1.txt。我遇到的问题是尝试从复杂的目录树(gs://bucketname/nfs/media/docs/test1.txt(下载文件时,传递的项目是nfs/media/docs/test1.txt。如果目录不存在,是否可以使用 .download_to_file(( 方法来创建目录?

以下是工作解决方案。我最终从对象名称中删除了路径并动态创建了目录结构。更好的方法可能是@Brandon Yarbrough建议使用"前缀+响应['前缀'][0]",但我不太清楚。希望这对其他人有所帮助。

#!/usr/local/bin/python3
from google.cloud import storage
from googleapiclient import discovery
from oauth2client.client import GoogleCredentials
import json
import os
import pathlib
bucket_name = 'test-bucket'
restore_epoch = '1519189202'
restore_location = '/Users/admin/data/'
credentials = GoogleCredentials.get_application_default()
service = discovery.build('storage', 'v1', credentials=credentials)
storage_client = storage.Client()
source_bucket = storage_client.get_bucket(bucket_name)

def listall_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()
    print(json.dumps(response, indent=2))

def listname_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()
    for item in response['items']:
        print(item['name'] + ' Uploaded on: ' + item['updated'] +
              ' Epoch: ' + item['metadata']['epoch'])

def downloadepoch_objects():
    request = service.objects().list(
        bucket=bucket_name,
        versions=True
    )
    response = request.execute()
    try:
        for item in response['items']:
            if item['metadata']['epoch'] == restore_epoch:
                print('Downloading ' + item['name'] + ' from ' +
                      item['bucket'] + '; Epoch= ' + item['metadata']['epoch'])
                print('Saving to: ' + restore_location)
                blob = source_bucket.blob(item['name'])
                path = pathlib.Path(restore_location + r'{}'.format(item['name'])).parent
                if os.path.isdir(path):
                    blob.download_to_filename(restore_location + '{}'.format(item['name']))
                    print('Download complete')
                else:
                    os.mkdir(path)
                    blob.download_to_filename(restore_location + '{}'.format(item['name']))
                    print('Download complete')
    except Exception:
        pass

# listall_objects()
# listname_objects()
downloadepoch_objects()

GCS没有"目录"的概念,尽管像gsutil这样的工具在假装方便方面做得很好。如果您希望所有对象都位于"nfs/media/docs/"路径下,则可以将其指定为前缀,如下所示:

request = service.objects.list(
    bucket=bucket_name,
    versions=True,
    prefix='nfs/media/docs/',  # Only show objects beginning like this
    delimiter='/'  # Consider this character a directory marker.
)
response = request.execute()
subdirectories = response['prefixes']
objects = response['items']

由于 prefix 参数,只有以 'nfs/media/docs' 开头的对象才会在 response['items'] 中返回。由于delimiter参数,"子目录"将在response['prefixes']中返回。您可以在 objects.list 方法的 Python 文档中获取更多详细信息。

如果你要使用较新的google-cloud Python库,我建议将其用于新代码,那么相同的调用看起来非常相似:

from google.cloud import storage
client = storage.Client()
bucket = client.bucket(bucket_name)
iterator = bucket.list_blobs(
    versions=True,
    prefix='nfs/media/docs/',
    delimiter='/'
)
subdirectories = iterator.prefixes
objects = list(iterator)

以下解决方案对我有用。我正在递归地将所有 blob 从路径前缀下载到项目根目录的model目录,同时保持文件夹结构。正在同时下载多个 Blob。

GCS 客户端版本 google-cloud-storage==1.41.1

import os
from datetime import datetime
from google.cloud import storage
from concurrent.futures import ThreadPoolExecutor
BUCKET_NAME = "ml-model"
def timer(func):
    def time_wrapper(*arg, **kwargs):
        start = datetime.now()
        func(*arg, **kwargs)
        diff = datetime.now() - start
        logger.info(f"{func.__name__} took {diff.seconds} s and {diff.microseconds//1000} ms")
    return time_wrapper
def fetch_environment() -> str:
    env = os.environ.get("environment", "staging")
    return env

def create_custom_folder(dir_name: str):
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

def fetch_gcs_credential_file_path():
    return os.environ.get("GCS_CREDENTIAL_FILE_PATH")

class GCS:
    def __init__(self):
        cred_file_path = fetch_gcs_credential_file_path()
        self.client = storage.Client.from_service_account_json(cred_file_path)
        self.bucket = self.client.bucket(BUCKET_NAME)
    def download_blob(self, blob):
        filename = blob.name.replace(self.path_prefix, '')
        delimiter_based_splits = filename.split('/')
        if len(delimiter_based_splits) > 1:
            dir_name = "model/" + "/".join(delimiter_based_splits[: len(delimiter_based_splits)-1])
            create_custom_folder(dir_name)
            blob.download_to_filename(f"{dir_name}/{delimiter_based_splits[-1]}")
        else:
            blob.download_to_filename(f"model/" + filename)
    @timer
    def download_blobs_multithreaded(self, prefix: str):
        '''
        CREATE FOLDER IF NOT EXISTS
        '''
        create_custom_folder("model")
        blobs = self.bucket.list_blobs(prefix=prefix)
        self.path_prefix = prefix
        with ThreadPoolExecutor() as executor:
            executor.map(self.download_blob, blobs

def download_model():
    env = fetch_environment()
    folder_path_prefix = f"ml/{env}/{ML_MODEL_NAME}/v1/tf-saved-model/"
    gcs = GCS()
    gcs.download_blobs_multithreaded(folder_path_prefix)
if __name__ == '__main__':
    download_model()

最新更新