APACHE FLINK 聚合函数与 tumblingWindow 用于计数事件,但如果没有发生事件,也会发送 0



我需要在翻滚窗口中计算事件。但是,如果窗口中没有事件,我也想发送值为 0 的事件。

类似的东西。

  1. 窗口计数: 5
  2. 窗口计数: 0
  3. 窗口计数: 0
  4. 窗口计数: 3
  5. 窗口计数: 0 ...
import com.google.protobuf.Message;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.skydivin4ng3l.cepmodemon.models.events.aggregate.AggregateOuterClass;
public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(T event, Long accumulator) {
return accumulator + 1L;
}
@Override
public AggregateOuterClass.Aggregate getResult(Long accumulator) {
return AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}

并在此处使用

DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());

时间特征是摄取时间

我读到了一个TiggerFunction,它可能会检测聚合的Stream是否在x时间后收到了事件,但我不确定这是否是正确的方法。

我希望即使窗口内根本没有事件,聚合也会发生。也许有一个我不知道的设置?

感谢任何提示。

我按照@David-Anderson的建议选择了选项1:

这是我的事件生成器:

public class EmptyEventSource implements SourceFunction<MonitorOuterClass.Monitor> {
private volatile boolean isRunning = true;
private final long delayPerRecordMillis;
public EmptyEventSource(long delayPerRecordMillis){
this.delayPerRecordMillis = delayPerRecordMillis;
}
@Override
public void run(SourceContext<MonitorOuterClass.Monitor> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(MonitorOuterClass.Monitor.newBuilder().build());
if (delayPerRecordMillis > 0) {
Thread.sleep(delayPerRecordMillis);
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}

和我调整后的聚合函数:

public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(T event, Long accumulator) {
if(((MonitorOuterClass.Monitor)event).equals(MonitorOuterClass.Monitor.newBuilder().build())) {
return accumulator;
}
return accumulator + 1L;
}
@Override
public AggregateOuterClass.Aggregate getResult(Long accumulator) {
AggregateOuterClass.Aggregate newAggregate = AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
return newAggregate;
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}

像这样使用它们:

DataStream<MonitorOuterClass.Monitor> someEntryStream = env.addSource(currentConsumer);
DataStream<MonitorOuterClass.Monitor> triggerStream = env.addSource(new EmptyEventSource(delayPerRecordMillis));
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.union(triggerStream)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());

当第一个事件分配给窗口时,Flink 的窗口是懒惰创建的。因此,空窗口不存在,并且无法产生结果。

通常,有三种方法可以解决此问题:

  1. 在将事件添加到流的窗口前面放置一些内容,确保每个窗口都有内容,然后修改窗口处理以在计算结果时忽略这些特殊事件。
  2. 使用
  3. GlobalWindow 和自定义触发器,该触发器使用处理时间计时器来触发窗口(没有事件流动,水印不会前进,事件时间计时器不会触发,直到更多事件到达(。
  4. 不要使用窗口 API,而是使用ProcessFunction实现自己的窗口。但在这里,您仍然会面临需要使用处理时间计时器的问题。

更新:

现在已努力实现选项 2 的示例,我不能推荐它。问题是即使使用自定义Trigger,如果窗口为空,也不会调用ProcessAllWindowFunction,因此有必要始终在GlobalWindow中保留至少一个元素。这似乎需要实现一个相当笨拙的EvictorProcessAllWindowFunction,这些协作以保留和忽略窗口中的特殊元素 - 并且您还必须首先以某种方式将该元素放入窗口中。

如果你要做一些黑客的事情,选项 1 似乎要简单得多。

相关内容

  • 没有找到相关文章

最新更新