作为云功能-GCP触发的数据流管道输入的GCS Bucket文件路径



我正在尝试使用GCS Bucket的Cloud Function(创建/完成(触发器来启动数据流管道。我正试图弄清楚如何在触发时将GCS Bucket中的csv文件路径提供给自定义数据流管道。

如果您遇到类似的问题,请告诉我,您在python中的解决方案是什么?

谢谢。

您需要为数据流创建flex模板。https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates在这种情况下,您可以在运行时将参数动态传递到数据流。完成后,在云功能中添加数据流触发代码,如下所示:

def startDataflow(project, flex_template_path, jobName, bq_dataset, raw_table, prc_table, start_date):
# Defining JSON request for trigger flex DF Job
parameters = {
"gcp_project": project,
"bq_dataset": bq_dataset,
"raw_table": raw_table,
"prc_table": prc_table,
"start_date": start_date
}
environment = {
"stagingLocation": DATAFLOW_STAGING_LOCATION,
"additionalExperiments": DATAFLOW_ADDITIONAL_EXPERIMENTS,
"maxWorkers": DATAFLOW_MAX_WORKER_COUNT,
"machineType": DATAFLOW_MACHINE_TYPE,
"serviceAccountEmail": DATAFLOW_SERVICE_ACCOUNT_EMAIL,
"network": DATAFLOW_NETWORK,
"subnetwork": DATAFLOW_SUBNETWORK,
"ipConfiguration": DATAFLOW_IP_CONFIGURATION
}
body = {
"launchParameter": {
"jobName": jobName,
"parameters": parameters,
"environment": environment,
"containerSpecGcsPath": flex_template_path,
}
}
service = build("dataflow", "v1b3", cache_discovery=False)
# Creating request to trigger the Flex DF Pipeline
request = (
service.projects().locations().flexTemplates().launch(
projectId=DATAFLOW_RUN_LOCATION,
location=DATAFLOW_RUN_PROJECT_REGION,
body=body
)
)
try:
response = request.execute()
except Exception as e:
logging.exception(
"There was an exception while triggering the dataflow pipeline with the job name: {}. The exception is: {}".format(
jobName, e))
return response

在上面的代码中,参数字典是在运行时将参数传递到数据流管道的地方。现在,由于您的云功能是由gcs事件触发的,您可以从gcs访问文件名,并在调用它时将其发送到数据流管道。

这能回答吗?

最新更新