我有一个与PubSub订阅连接的流媒体管道(每小时大约有200万个元素)。我需要把他们收集成一组,然后提取一些信息。
def expand(self, pcoll):
return (
pcoll |
beam.WindowInto(FixedWindows(10),
trigger=AfterAny(AfterCount(2000), AfterProcessingTime(30)),
allowed_lateness=10,
trigger=AfterAny(
AfterCount(8000),
AfterProcessingTime(30),
AfterWatermark(
early=AfterProcessingTime(60),
late=AfterProcessingTime(60)
)
),
allowed_lateness=60 * 60 * 24,
accumulation_mode=AccumulationMode.DISCARDING)
| "Group by Key" >> beam.GroupByKey()
我尽量不漏掉任何数据。但我发现我有大约4%的数据丢失。正如你在代码中看到的,当我达到8k个元素或每30秒触发一次。允许延迟1天,并且如果管道正在分析早期或晚期事件,它应该同时触发。
仍然错过了那4%。那么,有没有一种方法可以知道管道是否丢弃了一些数据?有多少元素?原因是什么?
提前谢谢你
首先,我看到示例代码中有两个触发器,但我认为这是一个错别字。
由于没有使用Repeatedly
,看起来您正在删除元素,因此在第一个触发器之后的所有元素都会丢失。这是Beam的官方文档。
请允许我张贴一个例子:
test_stream = (TestStream()
.add_elements([
TimestampedValue('in_time_1', 0),
TimestampedValue('in_time_2', 0)])
.advance_watermark_to(9)
.advance_processing_time(9)
.add_elements([TimestampedValue('late_but_in_window', 8)])
.advance_watermark_to(10)
.advance_processing_time(10)
.add_elements([TimestampedValue('in_time_window2', 12)])
.advance_watermark_to(20) # Past window time
.advance_processing_time(20)
.add_elements([TimestampedValue('late_window_closed', 9),
TimestampedValue('in_time_window2_2', 12)])
.advance_watermark_to_infinity())
class RecordFn(beam.DoFn):
def process(
self,
element=beam.DoFn.ElementParam,
timestamp=beam.DoFn.TimestampParam):
yield ("key", (element, timestamp))
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with TestPipeline(options=options) as p:
records = (p | test_stream
| beam.ParDo(RecordFn())
| beam.WindowInto(FixedWindows(10),
allowed_lateness=0,
# trigger=trigger.AfterCount(1),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| beam.GroupByKey()
| beam.Map(print)
)
如果我们有触发trigger.Repeatedly(trigger.AfterCount(1))
,所有元素都被触发,因为他们来了,没有丢弃元素(但late_window_closed
,这是预期的,因为它迟到了):
('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))]) # this two are together since they arrived together
('key', [('late_but_in_window', Timestamp(8))])
('key', [('in_time_window2', Timestamp(12))])
('key', [('in_time_window2_2', Timestamp(12))])
如果我们使用trigger.AfterCount(1)
(不重复),我们只得到到达管道的第一个元素:
('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))])
('key', [('in_time_window2', Timestamp(12))])
请注意,两个in_time_(1,2)
都出现在第一个触发的窗格中,因为它们同时到达(0),如果其中一个出现晚了,它就会被丢弃。