我按照本指南创建一个谷歌云函数,该函数在GCS bucket触发期间启动DataFlow作业。我的问题是关于模板和inout文件。我会在我的数据流管道中有这一部分,以通过TextIO.read
获得源数据(GCS csv(,但我不确定如何格式化管道的这一部分以考虑来自bucket触发器的文件。我要"ReadTable" >> TextIO.read().metadata
之类的吗?
p = beam.Pipeline(options=options)
raw_values = (
p
| "ReadTable" >> TextIO.read().from("gs://bucket/file.csv")
| "custFunc" >> beam.Map(CallAPI)
| "writeTable" >> WriteToBigQuery('newtablw', project='project1',
dataset='test', schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
使用Dataflow模板,您可以创建运行时提供的参数。
定义所需的模板,例如file_in和file_out。然后,当GCS事件触发您的云功能时,您可以获取事件数据来提取bucket和文件名,将它们连接起来,并将它们作为file_in数据流参数提供。