在Apache Beam中,PCollections可以被重用吗?它们会被重新计算还是被缓存?



我在GCP Dataflow上使用Apache Beam。我想多次使用一个PCollection,但我担心它可能会重新计算一个昂贵的PCollection。我找不到"物质化"。或";cache"

import apache_beam as beam
# Set up a pipeline and read in a PCollection
p = beam.Pipeline()
input_data = p | beam.io.ReadFromText('input.txt')
reused_data = input_data | beam.Map(some_expensive_function)

# Write the outputs to different files
reused_data | beam.io.WriteToText('output1.txt')
reused_data | beam.io.WriteToText('output2.txt')
# Run the pipeline
p.run()

这里会发生什么?它会重新计算我的数据还是缓存我的数据?如果我的机器没有足够的内存怎么办?

在写入的管道中,不会缓存或重新计算任何内容(模故障恢复)。虽然细节是由跑步者决定的,但大多数跑步者都会进行所谓的融合。特别地,在这种情况下将发生的事情大致为

  1. 从input.txt中获取第一个元素
  2. 应用some_expensive_function,生成x元素
  3. 将X写入output1.txt
  4. 将X写入output2.txt
  5. [回到第一步]

如果在2和3/4之间存在其他DoFns,则将逐个元素应用它们,并完全处理它们的输出,然后再返回步骤1开始处理下一个元素。在任何时候都没有实现完整的reused_dataPCollection,它一次只实现一个元素(当然可能是在多个worker之间并行地实现)。

如果由于某些原因无法实现融合(有时会发生冲突的资源约束或侧输入),中间数据将隐式物化到磁盘,而不是重新计算。

在这种情况下,reused_data计算一次,然后相同的PCollection和数据将下沉到2个GCS桶中。

reused_data  
|                 |
|                 | 
|                 |
Write GCS bucket 1    Write GCS bucket 2

每个sink将遍历reused_datapcollection以将结果写入云存储桶。

如果您必须在输入PCollection上使用昂贵的数据,我建议您在本地机器上使用Dataflow运行器而不是DirectRunner

Dataflow运行程序将并行处理您的数据,自动缩放,并在必要时使用多个Compute Engine虚拟机。

正如Mazlum Tosun已经回答的那样,您的PCollectionreused_data被写了两次。然而,我想指出的是,PCollection只能作为指针分发。因此,如果您希望/start在管道的一个分支中操作Pcollection,这可能会导致不正确的行为。

例如,如果您运行这段代码(例如,这里)

import apache_beam as beam

class ManipulatePcoll(beam.DoFn):
def process(self, element):
element[1] = 55
yield element

with beam.Pipeline() as pipeline:
main = (
pipeline
| "init main" >> beam.Create([[1,2,3]])
)

# pipeline branch 1
(
main
| "print original result" >> beam.Map(print)
)

# pipeline branch 2
(
main
| beam.ParDo(ManipulatePcoll())
| "print manipulated result" >> beam.Map(print)
)

你得到的结果是

[1, 55, 3]
[1, 55, 3]

,因为分支1中的main指向分支2中的相同内存。

然而,在某些情况下,PCollection实际上是序列化和复制的,例如,当在不同的worker之间分发数据时(查看完整列表)。

关于第二个问题,请允许我引用beam文档和编程指南

PCollection是一个大的、不可变的元素"包"。没有上限一个PCollection可以包含多少个元素;任何给定的PCollection可能适合内存或者可能表示非常大的由持久的支持的分布式数据集<数据存储/strong>.

最新更新