按元组顺序扁平化



我正在尝试使用Apache Beam中的Flatten功能添加标头。但是,似乎没有一种方法可以根据文档设置订单:https://beam.apache.org/documentation/sdks/pydoc/2.4.0/apache_beam.transform.transforms.cores.core.html?highighlight = highlight =Flatten#apache_beam.transforms.core.flatten。

有时,标头在数据的末尾,而其他则位于顶部。有没有办法设置订单?想知道我是否缺少东西。

with beam.Pipeline(options=options) as p:

  header = [
      ('name', 'number'),
  ]
  phones_list = [
      ('amy', '111-222-3333'),
      ('james', '222-333-4444'),
      ('amy', '333-444-5555'),
      ('carl', '444-555-6666'),
  ]
  header = p | 'Header' >> beam.Create(header)
  phones = p | 'CreatePhones' >> beam.Create(phones_list)  
  merged = ((phones,header)
            | 'MergedPColl' >> beam.Flatten())
  output = merged
  output | 'Write' >> beam.io.WriteToText('./_output')

输出1:

('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')
('name', 'number')

输出2:

('name', 'number')
('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')

Flatten是在PCollections上使用的变压器。为了使合并工作并联,我认为他们不能保证保留命令;这与由此产生的pcollection的无序性质一致。

但是,如果您的唯一目的是将标题添加到顶部,则可以使用header的CC_2参数。

> 标题(str(:在文件开头编写的字符串作为标头。如果不是:data: None and append_trailing_newlines 已设置,` n'将被添加。

phones | 'Write' >> beam.io.WriteToText(
  # Feel free to make your own header format.
  './_output', header="('name', 'number')")

更普遍地要保留原始输入的序列,我将用序列编号增强输入数据。在Beam的并行转换(携带每个元素的序列号(之后,您可以始终通过对该序列编号进行排序作为后处理步骤(以非并行模式(来"恢复"原始顺序。

最新更新