当我使用间隔为 1 小时的会话窗口并在处理数百万条消息后,我在日志中出现错误,可能只是针对某些行:
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase
法典:
grouped_tis = tracking_informations | beam.WindowInto(window.Sessions(session_window_gap)) | beam.GroupByKey() | beam.ParDo(MergeTI()) | "TI model -> json" >> beam.Map(jsons.dump)
全栈: https://pastebin.com/pqA5pMay
如果有人遇到同样的问题,我会解决这个问题,在beam.WindowInto(NONGLOBALWINDOW) | beam.GroupByKey()
和其他导致问题的 Ptransform 之间插入beam.WindowInto(beam.window.GlobalWindows())
。
这可能是因为某些代码(例如MergeTI
)在GlobalWindow
中返回元素,而PCollection
具有不同的窗口集:beam.WindowInto(window.Sessions(session_window_gap))
。
我也遇到了这个问题,似乎唯一可以解决它的方法是插入:
beam.WindowInto(beam.window.GlobalWindows())
就在之前:
beam.io.WriteToBigQuery(
对我来说,这是因为 BigQuery 表实际上想要一个%Y-%m-%d %H:%M:%S %Z
时间戳,但我传递了一个%Y-%m-%d
日期。我不知道为什么,但我很震惊。