我想在一个时间窗口内对来自无界源(我的源使用Pub/Sub(的每个键的元素进行计数,并在达到某个阈值时发出计数结果。例如,我想在10分钟的固定时间窗口内对元素进行计数,并在计数>5.
transformation = (p
| beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
| 'parse' >> beam.Map(json.loads)
| beam.WindowInto(window.FixedWindows(600))
| 'count' >> beam.combiners.Count.PerKey()
| 'filter' >> beam.Filter(lambda data: data['count'] > 5))
transformation | beam.io.WriteToPubSub(known_args.output_topic)
然而,写入Pub/Sub的结果似乎被延迟了,据我估计,结果是在窗口时间到期后发出的。我需要什么额外的窗口选项/代码才能立即发出结果?
您可以在文档中看到,您可以在窗口中添加触发器。
转到第9.3节,就有了CCD_ 1触发器的定义;计数完成";。
我从来没有用Python(只用Java(写过这篇文章,但代码应该是这样的
transformation = (p
| beam.io.ReadFromPubSub(subscription=known_args.input_subscription))
| 'parse' >> beam.Map(json.loads)
| beam.WindowInto(window.FixedWindows(600),
trigger=AfterCount(5))
| 'count' >> beam.combiners.Count.PerKey()
| 'filter' >> beam.Filter(lambda data: data['count'] > 5))
transformation | beam.io.WriteToPubSub(known_args.output_topic)
为了确保处理所有元素(如果窗口末尾的元素少于5个(,您需要构建一个复合触发器,比如这个
trigger=Repeatedly(
AfterAny(AfterCount(5), AfterProcessingTime(10 * 60))),