将BlockingDataFlowPipelinerner和后处理代码用于数据流模板



我想在管道完成所有处理后运行一些代码,因此我使用BlockingDataflowPipelineRunner并在main中使用pipeline.run()后放置代码。

使用BlockingDataflowPipelineRunner从命令行运行作业时,这正常工作。pipeline.run()下的代码在管道完成处理后运行。

但是,当我尝试作为模板运行该作业时,它不起作用。我将作业部署为模板(使用TemplatingDataflowPipelineRunner(,然后尝试在这样的云函数中运行模板:

dataflow.projects.templates.create({
    projectId: 'PROJECT ID HERE',
    resource: {
        parameters: {
            runner: 'BlockingDataflowPipelineRunner'
        },
        jobName: `JOB NAME HERE`,
        gcsPath: 'GCS TEMPLATE PATH HERE'
    }
}, function(err, response) {
    if (err) {
        // etc
    }
    callback();
});

跑步者似乎没有参加。我可以将Gibberish放在跑步者之下,而且工作仍在运行。

我在pipeline.run()下的代码在每个作业运行时不会运行 - 仅在我部署模板时运行。

是否期望每次作业运行main中的pipeline.run()下的代码?管道完成后是否有执行代码的解决方案?

(对于上下文,pipeline.run()之后的代码将文件从一个云存储桶移到另一个云存储桶。它正在归档刚刚处理的文件。(

是的,这种预期的行为。模板代表管道本身,并允许(重新(通过启动模板来执行管道。由于该模板不包括main()方法中的任何代码,因此在管道执行后不允许执行任何操作。

同样,dataflow.projects.templates.create API只是启动模板的API。

阻止跑步者完成的方式是从创建的管道中获取作业ID,并定期进行调查,以观察何时完成。对于您的用例,您需要做同样的事情:

  1. 执行dataflow.projects.templates.create(...)来创建数据流工作。这应该返回工作ID。
  2. 定期(例如,每5-10年代(进行轮询dataflow.projects.jobs.get(...)以用给定的ID检索工作,并检查它在哪种状态。

最新更新