如何在Apache beam中对时间窗口内的元素进行计数,并在计数达到某个阈值时发出数据



我想在一个时间窗口内对来自无界源(我的源使用Pub/Sub(的每个键的元素进行计数,并在达到某个阈值时发出计数结果。例如,我想在10分钟的固定时间窗口内对元素进行计数,并在计数>5.

transformation = (p
| beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
| 'parse' >> beam.Map(json.loads)
| beam.WindowInto(window.FixedWindows(600))
| 'count' >> beam.combiners.Count.PerKey()
| 'filter' >> beam.Filter(lambda data: data['count'] > 5))
transformation | beam.io.WriteToPubSub(known_args.output_topic)

然而,写入Pub/Sub的结果似乎被延迟了,据我估计,结果是在窗口时间到期后发出的。我需要什么额外的窗口选项/代码才能立即发出结果?

您可以在文档中看到,您可以在窗口中添加触发器。

转到第9.3节,就有了CCD_ 1触发器的定义;计数完成";。

我从来没有用Python(只用Java(写过这篇文章,但代码应该是这样的

transformation = (p
| beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
| 'parse' >> beam.Map(json.loads)
| beam.WindowInto(window.FixedWindows(600),
trigger=AfterCount(5))
| 'count' >> beam.combiners.Count.PerKey()
| 'filter' >> beam.Filter(lambda data: data['count'] > 5))
transformation | beam.io.WriteToPubSub(known_args.output_topic)

为了确保处理所有元素(如果窗口末尾的元素少于5个(,您需要构建一个复合触发器,比如这个

trigger=Repeatedly(
AfterAny(AfterCount(5), AfterProcessingTime(10 * 60))),

最新更新