如何在数据流作业完成时发出通知



我想知道数据流作业何时完成。

我尝试使以下两个管道

1.

| 'write to bigquery' >> beam.io.WriteToBigQuery(...)
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic')

阿拉伯数字。

| 'write to bigquery' >> beam.io.WriteToBigQuery(...)
| 'DoPubSub' >> beam.ParDo(DoPubSub())   # do Publish using google.cloud.pubsub

但是上面的两个代码都会产生以下错误:

属性

错误:"PDone"对象没有属性"窗口">

WriteToBigquery之后如何做程序?

注意: 我通过 REST 使用模板执行数据流。 所以,不能使用pipeline_result.wait_until_finish().

编辑。

全栈在这里。

File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 327, in <module>
vital_data_export()
File "<myPC_DIRPATH>/webapi-dataflow/pubsubtemplate.py", line 323, in vital_data_export
result = p.run()
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beampipeline.py", line 382, in run
return self.runner.run_pipeline(self)
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beamrunnersdataflowdataflow_runner.py", line 285, in run_pipeline
return_context=True)
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beampipeline.py", line 580, in to_runner_api
root_transform_id = context.transforms.get_id(self._root_transform())
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beamrunnerspipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beampipeline.py", line 810, in to_runner_api
for part in self.parts],
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beamrunnerspipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beampipeline.py", line 814, in to_runner_api
for tag, out in self.named_outputs().items()},
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beampipeline.py", line 814, in <dictcomp>
for tag, out in self.named_outputs().items()},
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beamrunnerspipeline_context.py", line 60, in get_id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beampvalue.py", line 144, in to_runner_api
self.windowing))
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beampvalue.py", line 128, in windowing
self.producer.inputs)
File "<myPC_DIRPATH>webapi-dataflowvenvdstobq_testlibsite-packagesapache_beamtransformsptransform.py", line 443, in get_windowing
return inputs[0].windowing
AttributeError: 'PDone' object has no attribute 'windowing'

在java中,这就是我在数据流管道结束时将"done"事件发布到PubSub的方法,其中管道的输出正在写入BigQuery。希望在Python中有等效的。

PCollection<TableRow> rows = data.apply("ConvertToTableRow", ParDo.of(new ConvertToRow()));
// Normally this would be the end of the pipeline..
WriteResult writeResult = rows.apply("WriteToBQ", BigQueryIO.writeTableRows().to(...);
// Transformations after this will be done AFTER all rows have been written to BQ
rows.apply(Wait.on(writeResult.getFailedInserts()))
// Transforms each row inserted to an Integer of value 1
.apply("OnePerInsertedRow", ParDo.of(new DoFn<TableRow, Integer>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(Integer.valueOf(1));
}
}))
// https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sum.java#L51
// Combines a PCollection of Integers (all 1's) by summing them. 
// Outputs a PCollection of one integer element with the sum
.apply("SumInsertedCounts", Sum.integersGlobally())
.apply("CountsMessage", ParDo.of(new DoFn<Integer, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
String messagePayload = "pipeline_completed";
Map<String, String> attributes = new HashMap<>();
attributes.put("rows_written", c.element().toString());
PubsubMessage message = new PubsubMessage(messagePayload.getBytes(), attributes);
c.output(message);
}
}))
.apply("PublishCompletionMessage", PubsubIO.writeMessages().to(/* output topic */));

你不能

很明显,PDone 是管道的最后阶段,不需要为此应用等待完成。

PInput 和 PDone 是 Apache Beam 支持的类,分别指示源和汇。如果尝试在 BigQuery 写入后执行某些操作,则无法执行,除非连续运行两个不同的数据流作业。

如果您正在寻找串联运行它们,请查看Apache Airflow。

最新更新