CombineFn用于Apache Beam管道中的Python字典



我一直在尝试用Python编写Apache Beam SDK来编写数据处理管道。

我的数据模拟了来自Google PubSub主题的物联网传感器数据,该主题流JSON数据如下:

{"id": 1, "temperature": 12.34}
{"id": 2, "temperature": 76.54}

id取值范围为0~99。将JSON读入Python字典是没有问题的。

我创建了一个由CombinePerKey处理的自定义CombineFn。我希望我的累加器的输出是计算,按PCollection中字典中各自的id字段分组。

然而,当add_input方法被调用时,它只接收字符串temperature而不是整个字典。我也没有找到任何参考来告诉CombinePerKey哪个键(id字段在我的情况下),我希望它分组数据。

也许我也误解了CombinePerKey和combefn的概念。如果有任何帮助或提示,我将不胜感激。也许有人有一个例子,处理JSON批与基于ID分组?我必须把字典转换成别的东西吗?

您需要调整CombineFn或(我建议)保持CombineFn尽可能通用,并相应地映射CombinePerKey的输入。我在下面用这个官方的光束例子做了一个简短的例子。

特定CombineFn:

import apache_beam as beam
class SpecificAverageFn(beam.CombineFn):
def create_accumulator(self):
sum = 0.0
count = 0
accumulator = sum, count
return accumulator
def add_input(self, accumulator, input):
sum, count = accumulator
extracted_input = input['temperature'] # <- this is a dict, you need to create custom code here
return sum + extracted_input, count + 1
def merge_accumulators(self, accumulators):
# accumulators = [(sum1, count1), (sum2, count2), (sum3, count3), ...]
sums, counts = zip(*accumulators)
# sums = [sum1, sum2, sum3, ...]
# counts = [count1, count2, count3, ...]
return sum(sums), sum(counts)
def extract_output(self, accumulator):
sum, count = accumulator
if count == 0:
return float('NaN')
return sum / count
with beam.Pipeline() as pipeline:
(
pipeline
| "mock input" >> beam.Create([
{'id': 1, 'temperature': 2},
{'id': 2, 'temperature': 3},
{'id': 2, 'temperature': 2}
])
| "add key" >> beam.Map(lambda x: (x['id'], x))
| beam.CombinePerKey(SpecificAverageFn())
| beam.Map(print)
)

通用Combinefn:

import apache_beam as beam
class GenericAverageFn(beam.CombineFn):
# everything as SpecificAverageFn, except add_input:
def add_input(self, accumulator, input):
sum, count = accumulator
return sum + input, count + 1

with beam.Pipeline() as pipeline:
iot_data = (
pipeline
| "mock input" >> beam.Create([
{'id': 1, 'temperature': 2},
{'id': 2, 'temperature': 3},
{'id': 2, 'temperature': 2}
])
| "add key" >> beam.Map(lambda x: (x['id'], x))
)
# repeat below for other values
(
iot_data
| "extract temp" >> beam.Map(lambda x: (x[0], x[1]['temperature']) 
| beam.CombinePerKey(AverageFn())
| beam.Map(print)
)

两种方法都返回

(1, 2.0)
(2, 2.5)

最新更新