我需要在翻滚窗口中计算事件。但是,如果窗口中没有事件,我也想发送值为 0 的事件。
类似的东西。
- 窗口计数: 5
- 窗口计数: 0
- 窗口计数: 0
- 窗口计数: 3
- 窗口计数: 0 ...
import com.google.protobuf.Message;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.skydivin4ng3l.cepmodemon.models.events.aggregate.AggregateOuterClass;
public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(T event, Long accumulator) {
return accumulator + 1L;
}
@Override
public AggregateOuterClass.Aggregate getResult(Long accumulator) {
return AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
并在此处使用
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());
时间特征是摄取时间
我读到了一个TiggerFunction,它可能会检测聚合的Stream是否在x时间后收到了事件,但我不确定这是否是正确的方法。
我希望即使窗口内根本没有事件,聚合也会发生。也许有一个我不知道的设置?
感谢任何提示。
我按照@David-Anderson的建议选择了选项1:
这是我的事件生成器:
public class EmptyEventSource implements SourceFunction<MonitorOuterClass.Monitor> {
private volatile boolean isRunning = true;
private final long delayPerRecordMillis;
public EmptyEventSource(long delayPerRecordMillis){
this.delayPerRecordMillis = delayPerRecordMillis;
}
@Override
public void run(SourceContext<MonitorOuterClass.Monitor> sourceContext) throws Exception {
while (isRunning) {
sourceContext.collect(MonitorOuterClass.Monitor.newBuilder().build());
if (delayPerRecordMillis > 0) {
Thread.sleep(delayPerRecordMillis);
}
}
}
@Override
public void cancel() {
isRunning = false;
}
}
和我调整后的聚合函数:
public class BasicCounter<T extends Message> implements AggregateFunction<T, Long, AggregateOuterClass.Aggregate> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(T event, Long accumulator) {
if(((MonitorOuterClass.Monitor)event).equals(MonitorOuterClass.Monitor.newBuilder().build())) {
return accumulator;
}
return accumulator + 1L;
}
@Override
public AggregateOuterClass.Aggregate getResult(Long accumulator) {
AggregateOuterClass.Aggregate newAggregate = AggregateOuterClass.Aggregate.newBuilder().setVolume(accumulator).build();
return newAggregate;
}
@Override
public Long merge(Long accumulator1, Long accumulator2) {
return accumulator1 + accumulator2;
}
}
像这样使用它们:
DataStream<MonitorOuterClass.Monitor> someEntryStream = env.addSource(currentConsumer);
DataStream<MonitorOuterClass.Monitor> triggerStream = env.addSource(new EmptyEventSource(delayPerRecordMillis));
DataStream<AggregateOuterClass.Aggregate> aggregatedStream = someEntryStream
.union(triggerStream)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new BasicCounter<MonitorOuterClass.Monitor>());
当第一个事件分配给窗口时,Flink 的窗口是懒惰创建的。因此,空窗口不存在,并且无法产生结果。
通常,有三种方法可以解决此问题:
- 在将事件添加到流的窗口前面放置一些内容,确保每个窗口都有内容,然后修改窗口处理以在计算结果时忽略这些特殊事件。 使用
- GlobalWindow 和自定义触发器,该触发器使用处理时间计时器来触发窗口(没有事件流动,水印不会前进,事件时间计时器不会触发,直到更多事件到达(。
- 不要使用窗口 API,而是使用
ProcessFunction
实现自己的窗口。但在这里,您仍然会面临需要使用处理时间计时器的问题。
更新:
现在已努力实现选项 2 的示例,我不能推荐它。问题是即使使用自定义Trigger
,如果窗口为空,也不会调用ProcessAllWindowFunction
,因此有必要始终在GlobalWindow
中保留至少一个元素。这似乎需要实现一个相当笨拙的Evictor
和ProcessAllWindowFunction
,这些协作以保留和忽略窗口中的特殊元素 - 并且您还必须首先以某种方式将该元素放入窗口中。
如果你要做一些黑客的事情,选项 1 似乎要简单得多。