我这里有一个示例管道:
def print_windows(element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
print(window)
print(pane_info)
print(timestamp)
print(element)
print('-----------------')
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
keyed_elements = [
('USA', {'score': 1, 'timestamp': 2}),
('USA', {'score': 2, 'timestamp': 4}),
('USA', {'score': 3, 'timestamp': 4}),
('USA', {'score': 4, 'timestamp': 5}),
('USA', {'score': 5, 'timestamp': 14}),
('USA', {'score': 6, 'timestamp': 17}),
]
elements = (
p
| beam.Create(keyed_elements)
| 'ConvertIntoUserEvents' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e[1]['timestamp']))
| beam.Map(lambda e: (e[0], e[1]['score']))
)
results = (
elements
| "" >> beam.WindowInto(
beam.window.FixedWindows(10),
trigger=Repeatedly(AfterCount(2)),
accumulation_mode=beam.transforms.trigger.AccumulationMode.ACCUMULATING
)
| beam.CombinePerKey(beam.combiners.ToListCombineFn())
)
results | beam.ParDo(print_windows)
这个想法很简单 - 我想获取一些带有时间戳的分数并将它们组合成一个列表。我在看到 2 个元素后触发每个窗格。
如果我按原样运行,我会得到:
[0.0, 10.0)
PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
Timestamp(9.999000)
('USA', [1, 2, 3, 4])
-----------------
[10.0, 20.0)
PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
Timestamp(19.999000)
('USA', [5, 6])
但是,如果我将累积模式更改为丢弃,输出将保持不变。我很困惑,因为根据我对高层次的理解,累积将输出如下窗格:
[1, 2] ... [1, 2, 3, 4]
前 10 秒窗口,然后[5, 6]
最后 10 秒窗口。
另一方面,丢弃应该给出:
[1, 2] .. [3, 4]
然后[5,6]
.为什么输出相同?
根据 Beam 概念,窗口可以包含 0 到 N 个窗格,这些窗格由应用程序代码中的触发器定义控制。
当触发器定义为Accumulating
时,这意味着作为窗口的一部分并根据触发器逻辑触发的任何值都将保留,并在触发新窗格或关闭窗口时附加到新值。
当触发器定义为Discarding
时,这意味着作为窗口的一部分并根据触发器逻辑触发的任何值都将被丢弃,并且不适用于触发的以下新窗格或关闭窗口时。
在上面的示例中,如果触发逻辑更改为以下内容,您可以观察到至少两个窗格:-
- 早
- ON_TIME
beam.transforms.trigger.AfterWatermark(early=AfterCount(2))
下面的窗格ACCUMULATING
是行为
INFO:apache_beam.runners.portability.fn_api_runner:Running (CombinePerKey(ToListCombineFn)/GroupByKey/Read)+((CombinePerKey(ToListCombineFn)/Combine)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_26))
INFO:root:2020-05-24 14:10:00
INFO:root:2020-05-24 14:12:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329519.999000)
INFO:root:('USA', [{'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 4, 'ts': 60}, {'score': 6, 'ts': 105}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 14:10:00
INFO:root:2020-05-24 14:12:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329519.999000)
INFO:root:('USA', [{'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 4, 'ts': 60}, {'score': 6, 'ts': 105}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
下面的窗格DISCARDING
是行为
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 14:14:00
INFO:root:2020-05-24 14:16:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329759.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [])
INFO:root:-----------------
INFO:root:2020-05-24 14:14:00
INFO:root:2020-05-24 14:16:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329759.999000)
INFO:root:('USA', [])
INFO:root:-----------------
在ACCUMULATING
的情况下,当达到水印并且窗口关闭时EARLY
窗格中的值将保留ON_TIME
窗格。
然而,在DISCARDING
窗格中,EARLY
窗格中的值将被丢弃,ON_TIME
窗格为空。
在通过 Pub/Sub 流式传输元素的实际场景中,可能会触发超过 1 个 EARLY 窗格。在模拟方案中,由于所有值都已存在,因此不能触发超过 1 个EARLY
窗格。