如何将每个标记的输出写入Apachebeam中的不同文件



我有这样的代码,它根据输入文件的一些数据标记输出:

class filters(beam.DoFn):
def process(self, element): 
data = json.loads(element)
yield TaggedOutput(data['EventName'],element)

我需要帮助下一步写入结果标记输出:

tagged = lines | beam.ParDo(filters()).with_outputs('How can I dinamiclly acces this tags?')

正如你所看到的,当我这样做的时候,".with_outputs(("我不知道标签会有多少个和什么名字,所以我无法预测像这样的事情

tag1 = tagged.tag1

感谢您的帮助

更新:这不起作用,因为.outputs((为空

tagged_data= lines | 'tagged data by key' >>  
beam.ParDo(filters()).with_outputs()
for tag in tagged_data:
print('something')
output: WARNING:apache_beam.options.pipeline_options:Discarding unparseable args

但这将工作

tagged_data= lines | 'tagged data by key' >>  
beam.ParDo(filters()).with_outputs('tag1','tag2')
for tag in tagged_data:
print('something')
output:
something
something

Apache Beam管道执行被推迟——要执行的操作的DAG被建立起来,在运行管道之前什么都不会发生。(在Beam Python中,这通常在with beam.Pipeline(...)块的末尾隐式调用。(PCollections实际上并不包含数据,只是关于如何计算数据的指令。

特别是,这意味着当你写时

tagged = lines | beam.ParDo(filters()).with_outputs(...)

标记实际上并不包含任何数据,而是包含对将要生成的PCollections的引用(可以向它们添加进一步的处理步骤(。lines中的数据尚未实际计算或读取,因此您无法(在管道施工期间(弄清楚输出集是什么。

从这个问题中还不清楚你的最终目标是什么,但如果你试图对输出进行分区,你可能想看看动态目的地。

要实现您想要实现的目标,您需要创建一个DoFn。你可以使用这个例子作为基础:

from apache_beam.io.textio import _TextSink

class WriteEachKeyToText(beam.DoFn):
def __init__(self, file_path_prefix=str):
super().__init__()
self.file_path_prefix = file_path_prefix
def process(self, kv):
key = kv[0]
elements = kv[1]
sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")
writer = sink.open_writer("prefix", self.file_path_prefix)
for e in elements:  # values
writer.write(e)

然后,你可以这样使用它:

output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))

最新更新