我们有一个从Pub/Sub GCP Topic读取数据的管道。我们希望根据上述数据创建15分钟的聚合(积分和平均值)。为此,我们创建一个FixedWindow,然后创建/分组/删除一个虚拟键,这允许我们将窗口中的所有消息放在一个列表中,然后使用使用pandas执行处理的DoFn自定义类创建这些聚合。最后,我们将结果写入InfluxDB数据库。
(
p
| 'read_telemetry_from_pubsub' >> ReadFromPubSub(topic=PUBSUB_TOPIC)
| 'window_telemetry' >> beam.WindowInto(beam.window.FixedWindows(WINDOW_SIZE))
| 'format_telemetry_for_influx' >> beam.ParDo(FormatInfluxDB())
| "add_dummy_key" >> beam.Map(lambda elem: (None, elem))
| "groupby_dummy_key" >> beam.GroupByKey()
| "delete_dummy_key" >> beam.MapTuple(lambda _, val: val)
| "aggregate" >> beam.ParDo(Aggregator())
| "write_processed_messages_to_influx" >> beam.ParDo(WriteToInfluxDB())
)
这是聚合包含15分钟窗口内所有消息的列表的类:
class Aggregator(beam.DoFn):
def process(self, elements):
# parsing the message list into a pandas DataFrame
# some preprocessing and agregation steps
# returns a list with json messages
return [aggregated_values]
我们使用GCP Pub/Sub模拟器在本地测试此代码,它工作得很好。然而,当我们部署到GCP Dataflow时,它不会发出任何结果,也不会在日志中发现任何错误。此外,我们看到数据的新鲜度无限增长。
我们相信我们缺少一些触发函数,但我们不确定这是否是做这种聚合的正确方法,因为它在本地发出结果,而不是在部署时发出结果。当我们使用不同于默认的触发器时,本地就没有排放。
我们已经尝试了一些触发选项(repeat, AfterProcessingTime, AccumulationMode)。DISCARDING, AfterWatermark)和另一种使用Combine自定义类的方法,但我们也没有导致发射。注意:更详细的代码,这里是聚合类的完整代码。
使用setup.py
文件而不是requirements.txt
文件来解决所需软件包的问题。此外,我们将python Apache Beam SDK版本从2.29.0
更新到2.32.0
,现在结果正在正确地聚合和发出。
我们在Logging GCP模块中发现了这个错误:
同步pod错误…("pipeline_name")跳过:[failed to ."StartContainer"为"sdk0"使用CrashLoopBackOff:后退5秒重新启动失败的容器=sdk0 pod=pipeline_name)">
然后,我们找到了这个答案。我们使用这个代码片段来创建我们自己的setup.py
。