写入动态目的地,以在Python中的DataFlow中的云存储



我试图从云存储中的一个大文件中读取并根据给定的字段将其碎片。

我打算阅读|地图(lambda x:(x [键字段],x((|GroupByKey |以关键字段的名称写入文件。

但是,我找不到动态写入云存储的方法。此功能受支持吗?

谢谢,yiqing

是的,您可以使用FileSystems API创建文件。

在2.14.0, beam.io.fileio.WriteToFiles

中添加了一个实验写入。
my_pcollection | beam.io.fileio.WriteToFiles(
      path='/my/file/path',
      destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
      sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
      file_naming=beam.io.fileio.destination_prefix_naming())

可用于每记录的不同文件。

您可以跳过GroupByKey,只需使用destination来决定每个记录写入的文件。destination的返回值必须是可以由。

分组的值。

更多文档:

https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.fileio.fileio.html#dynamic-destinations

和jira问题:

https://issues.apache.org/jira/browse/browse/beam-2857

最新更新