我正在使用全局窗口,该窗口具有永久重复的处理后时间触发器,以处理来自pub-sub的流式数据,如下所示:
PCollection<KV<String,SMMessage>> perMSISDNLatestEvents = messages
.apply("Apply global window",Window.<SMMessage>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(1))))
.discardingFiredPanes())
.apply("Convert into kv of msisdn and SM message", ParDo.of(new SmartcareMessagetoKVFn()))
.apply("Get per MSISDN latest event",Latest.perKey()).apply("Write into Redis", ParDo.of(new WriteRedisFn()));
有没有一种方法可以让apachebeam触发器重复地永远只在上一次执行完成后执行?我提出这个问题的原因是,下一个触发器处理将需要从redis读取数据,这些数据是由上一个触发器执行编写的。
谢谢
因此,这里的触发器将按您提供的时间间隔触发。触发器不知道任何下游处理,因此无法依赖于管道的这些步骤。
您可以添加一个屏障(DoFn
(,它存在于Write
步骤之前,并且只有在Redis中看到以前的数据后才放弃执行,而不是依赖于这里的一致性触发器。
您可以尝试显式声明全局窗口触发器,如下例:
Trigger subtrigger = AfterProcessingTime.pastFirstElementInPane();
Trigger maintrigger = Repeatedly.forever(subtrigger);
我认为触发器会对您的情况有所帮助,因为它将允许您创建事件时间,当您或您的代码触发它们时,这些时间将运行,因此只有当触发器首先完成时,您才会永远重复运行。
我发现了这个文档,它可能会指导您尝试创建触发器。