apache beam (python SDK):后期(或早期)事件丢弃和触发.怎么知道丢弃了多少,为什么?



我有一个与PubSub订阅连接的流媒体管道(每小时大约有200万个元素)。我需要把他们收集成一组,然后提取一些信息。

def expand(self, pcoll):
return (
pcoll |
beam.WindowInto(FixedWindows(10),
trigger=AfterAny(AfterCount(2000), AfterProcessingTime(30)),
allowed_lateness=10,
trigger=AfterAny(
AfterCount(8000),
AfterProcessingTime(30),
AfterWatermark(
early=AfterProcessingTime(60),
late=AfterProcessingTime(60)
)
),
allowed_lateness=60 * 60 * 24,
accumulation_mode=AccumulationMode.DISCARDING)
| "Group by Key" >> beam.GroupByKey()

我尽量不漏掉任何数据。但我发现我有大约4%的数据丢失。正如你在代码中看到的,当我达到8k个元素或每30秒触发一次。允许延迟1天,并且如果管道正在分析早期或晚期事件,它应该同时触发。

仍然错过了那4%。那么,有没有一种方法可以知道管道是否丢弃了一些数据?有多少元素?原因是什么?

提前谢谢你

首先,我看到示例代码中有两个触发器,但我认为这是一个错别字。

由于没有使用Repeatedly,看起来您正在删除元素,因此在第一个触发器之后的所有元素都会丢失。这是Beam的官方文档。

请允许我张贴一个例子:


test_stream = (TestStream()
.add_elements([
TimestampedValue('in_time_1', 0),
TimestampedValue('in_time_2', 0)])
.advance_watermark_to(9)
.advance_processing_time(9)
.add_elements([TimestampedValue('late_but_in_window', 8)])
.advance_watermark_to(10)
.advance_processing_time(10)
.add_elements([TimestampedValue('in_time_window2', 12)])
.advance_watermark_to(20)  # Past window time
.advance_processing_time(20)
.add_elements([TimestampedValue('late_window_closed', 9),
TimestampedValue('in_time_window2_2', 12)])
.advance_watermark_to_infinity())

class RecordFn(beam.DoFn):
def process(
self,
element=beam.DoFn.ElementParam,
timestamp=beam.DoFn.TimestampParam):
yield ("key", (element, timestamp))

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
records = (p | test_stream
| beam.ParDo(RecordFn())
| beam.WindowInto(FixedWindows(10),
allowed_lateness=0,
# trigger=trigger.AfterCount(1),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.GroupByKey()
| beam.Map(print)
)

如果我们有触发trigger.Repeatedly(trigger.AfterCount(1)),所有元素都被触发,因为他们来了,没有丢弃元素(但late_window_closed,这是预期的,因为它迟到了):

('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))])  # this two are together since they arrived together
('key', [('late_but_in_window', Timestamp(8))])
('key', [('in_time_window2', Timestamp(12))])
('key', [('in_time_window2_2', Timestamp(12))])

如果我们使用trigger.AfterCount(1)(不重复),我们只得到到达管道的第一个元素:

('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))])
('key', [('in_time_window2', Timestamp(12))])

请注意,两个in_time_(1,2)都出现在第一个触发的窗格中,因为它们同时到达(0),如果其中一个出现晚了,它就会被丢弃。

最新更新