在datastoreio.write之后链接另一个转换



我使用Apache Beam Java SDK创建了Google DataFlow管道。我在那里有一些转换,最终创建了一个实体集合(pcollection< entity>)。我需要将其写入Google数据存储中,然后在所有实体都撰写后执行另一个转换。(例如,通过PubSub消息向多个订户广播保存对象的ID)。

现在,存储PCollection的方法是:entities.datastoreio.v1()。write()。用projectId(" abc")

这将返回一个pdone对象,我不确定如何链接另一个变换()完成()完成后发生。由于datastoreio.write()呼叫没有返回PCollection,因此我无法进一步介绍管道。我有2个问题:

  1. 如何获取写入数据存储的对象的ID?

  2. 如何附加在保存所有实体后将采取的另一个转换?

我们没有一个很好的方法来做这两个事情(返回书面数据存储实体的ID,或等到写入实体),尽管这远非第一个类似的类似请求(例如,人们已经要求BigQuery),我们正在考虑。

现在,您唯一的选择是等到整个管道完成,例如通过pipeline.run().waitUntilFinish(),然后在主程序中执行您想要的操作(例如,您可以运行另一个管道)。

最新更新