从公共输入分支和合并Apache Beam中的pcollection列表



我正在构建一个数据流管道,在分支和合并输出时遇到了一些问题。我想要构建的管道如下:

  1. 读取一些输入数据input_data
  2. A。在input_data上提取一些度量metric_1。B.在input_data上提取一些其他度量metric_2
  3. 由于这些度量提取在计算上是昂贵的,所以我想从主input_data分支出来,然后合并输出以进行进一步的计算。合并输出output

下面是一些封装我实际管道的示例代码

class ReadData(beam.DoFn):
def process(self, element):
# read from source
return [{'input': np.random.rand(100,10)}]

class GetFirstMetric(beam.DoFn):
def process(self, element):
# some processing
return [{'first': np.random.rand(100,4)}]

class GetSecondMetric(beam.DoFn):
def process(self, element):
# some processing
return [{'second': np.random.rand(100,3)}]

def run():
with beam.Pipeline() as p:
input_data = (p | 'read sample data' >> beam.ParDo(ReadData()))
metric_1 = (input_data | 'some metric on input data' >> beam.ParDo(GetFirstMetric()))
metric_2 = (input_data | 'some aggregate metric' >> beam.ParDo(GetSecondMetric()))
output = ((metric_1, metric_2) 
| beam.Flatten()
| beam.combiners.ToList()
| beam.Map(print)
)

当我运行此程序时,我会得到一个'PBegin' object has no attribute 'windowing'错误。我看到了一些在Java中执行类似操作的示例和示例代码。但我找不到合适的资源来在Python中执行同样的操作。我的问题如下:

  1. 分支和合并pcollections的正确方法是什么(尤其是如果分支来自公共输入(?

  2. 是否有更好的管道设计来实现同样的目标?

提前感谢!

在这段代码中,您的问题是没有"启动"初始PCollection。在ReadData.process中,变量element的值是多少?

好吧,运行程序无法得出一个值,因为没有输入pcollection。您需要创建第一个PCollection。您可以执行类似以下代码的操作。。。

至于将它们列为一个列表,也许一个辅助输入策略可能会奏效。CAn您尝试以下操作:

def run():
with beam.Pipeline() as p:
starter_pcoll = p | beam.Create(['any'])
input_data = (starter_pcoll | 'read sample data' >> beam.ParDo(ReadData()))
metric_1 = (input_data | 'some metric on input data' >> beam.ParDo(GetFirstMetric()))
metric_2 = (input_data | 'some aggregate metric' >> beam.ParDo(GetSecondMetric()))
side_in = beam.pvalue.AsList((metric_1, metric_2) 
| beam.Flatten())
p | beam.Create(['any']) | beam.Map(lambda x, si: print(si),
side_in)

这应该会让你的管道运行起来。很高兴进一步澄清您的具体问题。

最新更新