Google Cloud Dataflow - Python Streaming JSON to PubSub - D



试图做一些概念上简单的事情,但把我的头撞在墙上。

我正在尝试在 Python 中创建一个流式数据流作业,该作业使用来自 PubSub 主题/订阅的 JSON 消息,对每条消息执行一些基本操作(在本例中,将温度从 C 转换为 F(,然后将记录发布回不同的主题:

from __future__ import absolute_import
import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
import json
'''Normalize pubsub string to json object'''
# Lines look like this:
#{"temperature": 29.223036004820123}

def transform_temp(line):
record = json.loads(line)
record['temperature'] = record['temperature'] * 1.8 + 32
return json.dumps(record)
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_topic', required=True,
help=('Output PubSub topic of the form '
'"projects/<PROJECT>/topic/<TOPIC>".'))
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
group.add_argument(
'--input_subscription',
help=('Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read the pubsub topic into a PCollection.
lines = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
| beam.Map(transform_temp)
| beam.io.WriteStringsToPubSub(known_args.output_topic)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

使用 DirectRunner 在本地运行此代码时,一切正常。 但是,在切换到DataflowRunner时,我从未看到有关新主题的任何消息。

我还尝试向 transform_temp 函数添加一些日志记录调用,但在 Dataflow 的控制台日志中看不到任何内容。

有什么建议吗? 顺便说一句 - 如果我只是将输入主题探测到输出主题,我可以看到消息,所以我知道流媒体工作正常。

非常感谢!

您可能只是缺少一个窗口函数。Apache beam的文档指出,对于流式处理管道,您需要设置非默认窗口或非默认触发器。由于尚未定义窗口,因此您有一个全局窗口,因此它可能在转到接收器之前无休止地等待窗口的末尾。

最新更新