我试图从云存储中的一个大文件中读取并根据给定的字段将其碎片。
我打算阅读|地图(lambda x:(x [键字段],x((|GroupByKey |以关键字段的名称写入文件。
但是,我找不到动态写入云存储的方法。此功能受支持吗?
谢谢,yiqing
是的,您可以使用FileSystems
API创建文件。
在2.14.0, beam.io.fileio.WriteToFiles
:
my_pcollection | beam.io.fileio.WriteToFiles(
path='/my/file/path',
destination=lambda record: 'avro' if record['type'] == 'A' else 'csv',
sink=lambda dest: AvroSink() if dest == 'avro' else CsvSink(),
file_naming=beam.io.fileio.destination_prefix_naming())
可用于每记录的不同文件。
您可以跳过GroupByKey
,只需使用destination
来决定每个记录写入的文件。destination
的返回值必须是可以由。
更多文档:
https://beam.apache.org/releases/pydoc/2.14.0/apache_beam.io.fileio.fileio.html#dynamic-destinations
和jira问题:
https://issues.apache.org/jira/browse/browse/beam-2857