我正在寻找在GCP中实现归档功能的最佳方法的建议。基本上,我想归档文件,其中包括将文件从一个存储桶移动到另一个。
我已经创建了一个云函数(python代码(来触发文件移动功能,但我遇到了超时问题,云函数需要大量时间来移动文件(文件大小可能超过100GB+(。
这是我代码的核心,我将对象从一个桶"移动"到另一个桶:
if status == "DONE":
#Archive CSV file
try:
source_blob = pub_bucket.blob(message['data']['filename'])
new_blob = pub_bucket.copy_blob(
source_blob, pri_bucket, folder_name+message['data']['filename'])
except Exception as e:
print("Something went wrong when moving the file " + message['data']['filename'] + " to the private bucket: {}".format(e))
一个重要的注意事项是,在归档之前,我会进行一些文件处理,以便了解何时可以触发归档功能。因此,我的问题更多地是关于触发此归档功能的最佳方式/体系结构是什么?
提前谢谢!
云函数的默认请求超时为1分钟,但您可以将其进一步延长至9分钟。如果你的工作需要更长的时间,那么我建议使用传输服务API:以编程方式创建传输工作
传输服务API会立即返回响应,这样您就可以在函数中使用它,而无需等待作业完成。此外,您可以安排每个传输作业,并在控制台中查看其当前状态。
开始:
- 启用存储传输API
- 通过尝试此链接中的API生成存储转移服务帐户
- 在IAM上,添加服务帐户并赋予其
Storage Admin
和Storage Transfer Admin
角色 - 创建一个Python应用程序,并遵循以下创建数据传输文档中的示例代码:
import argparse
import datetime
import json
import googleapiclient.discovery
def main(description, project_id, start_date, start_time, source_bucket,
sink_bucket):
"""Create a daily transfer from Standard to Nearline Storage class."""
storagetransfer = googleapiclient.discovery.build('storagetransfer', 'v1')
# Edit this template with desired parameters.
transfer_job = {
'description': description,
'status': 'ENABLED',
'projectId': project_id,
'schedule': {
'scheduleStartDate': {
'day': start_date.day,
'month': start_date.month,
'year': start_date.year
},
'startTimeOfDay': {
'hours': start_time.hour,
'minutes': start_time.minute,
'seconds': start_time.second
}
},
'transferSpec': {
'gcsDataSource': {
'bucketName': source_bucket
},
'gcsDataSink': {
'bucketName': sink_bucket
},
'objectConditions': {
'minTimeElapsedSinceLastModification': '2592000s' # 30 days
},
'transferOptions': {
'deleteObjectsFromSourceAfterTransfer': 'true'
}
}
}
result = storagetransfer.transferJobs().create(body=transfer_job).execute()
print('Returned transferJob: {}'.format(
json.dumps(result, indent=4)))
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument('description', help='Transfer description.')
parser.add_argument('project_id', help='Your Google Cloud project ID.')
parser.add_argument('start_date', help='Date YYYY/MM/DD.')
parser.add_argument('start_time', help='UTC Time (24hr) HH:MM:SS.')
parser.add_argument('source_bucket', help='Standard GCS bucket name.')
parser.add_argument('sink_bucket', help='Nearline GCS bucket name.')
args = parser.parse_args()
start_date = datetime.datetime.strptime(args.start_date, '%Y/%m/%d')
start_time = datetime.datetime.strptime(args.start_time, '%H:%M:%S')
main(
args.description,
args.project_id,
start_date,
start_time,
args.source_bucket,
args.sink_bucket)
- 添加以下依赖项:
google-api-python-client==2.7.0
google-auth==1.31.0
google-auth-httplib2==0.1.0
- 通过以下参数测试运行它:
python py.py DESCRIPTION PROJECT_ID START_DATE START_TIME SOURCE_BUCKET SINK_BUCKET
例如:
python py.py test-description myproj 2021/06/29 00:00:00 mysource mydest
附加说明:
- 以编程方式配置传输需要时间为UTC。样本将每天重新运行作业。为了确保传输只运行一次,
scheduleEndDate
和scheduleStartDate
必须相同,并且在将来相对于UTC - 示例作业使用近线存储作为目标存储桶。如果需要,可以使用其他存储类
- 对于要立即运行的一次性传输,请不要指定
startTimeOfDay
- 要对代码进行更精细的调整,请参阅
TransferJob