我想在管道完成所有处理后运行一些代码,因此我使用BlockingDataflowPipelineRunner
并在main
中使用pipeline.run()
后放置代码。
使用BlockingDataflowPipelineRunner
从命令行运行作业时,这正常工作。pipeline.run()
下的代码在管道完成处理后运行。
但是,当我尝试作为模板运行该作业时,它不起作用。我将作业部署为模板(使用TemplatingDataflowPipelineRunner
(,然后尝试在这样的云函数中运行模板:
dataflow.projects.templates.create({
projectId: 'PROJECT ID HERE',
resource: {
parameters: {
runner: 'BlockingDataflowPipelineRunner'
},
jobName: `JOB NAME HERE`,
gcsPath: 'GCS TEMPLATE PATH HERE'
}
}, function(err, response) {
if (err) {
// etc
}
callback();
});
跑步者似乎没有参加。我可以将Gibberish放在跑步者之下,而且工作仍在运行。
我在pipeline.run()
下的代码在每个作业运行时不会运行 - 仅在我部署模板时运行。
是否期望每次作业运行main
中的pipeline.run()
下的代码?管道完成后是否有执行代码的解决方案?
(对于上下文,pipeline.run()
之后的代码将文件从一个云存储桶移到另一个云存储桶。它正在归档刚刚处理的文件。(
是的,这种预期的行为。模板代表管道本身,并允许(重新(通过启动模板来执行管道。由于该模板不包括main()
方法中的任何代码,因此在管道执行后不允许执行任何操作。
同样,dataflow.projects.templates.create
API只是启动模板的API。
阻止跑步者完成的方式是从创建的管道中获取作业ID,并定期进行调查,以观察何时完成。对于您的用例,您需要做同样的事情:
- 执行
dataflow.projects.templates.create(...)
来创建数据流工作。这应该返回工作ID。 - 定期(例如,每5-10年代(进行轮询
dataflow.projects.jobs.get(...)
以用给定的ID检索工作,并检查它在哪种状态。