如何为每个传入元素实现具有动态超时的flink countTriggerWithTimeout



对于flink流处理来说,这是一个全新的概念。这是我的要求:当在最后20秒内接收到2个或多个元素时,向用户发出警报。如果在20秒内收到的元素少于2个,不要发出警报,只需重新开始计数和计时。每个元素的计数和间隔各不相同。

这是我的代码:

dataStream
.keyBy("id")
.window(EventTimeSessionWindows.withDynamicGap((event) -> event.getThresholdInterval()))
.trigger(new CountTriggerWithTimeout<TimeWindow>())
TriggerCode:
public class CountTriggerWithTimeout<W extends TimeWindow> extends Trigger<SystemEvent, W> {
private ReducingStateDescriptor<Long> countState =
new ReducingStateDescriptor<Long>("count", new Sum(), LongSerializer.INSTANCE);
private ReducingStateDescriptor<Long> processedState =
new ReducingStateDescriptor<Long>("processed", new Sum(), LongSerializer.INSTANCE);
@Override
public TriggerResult onElement(SystemEvent element, long timestamp, W window, TriggerContext ctx)
throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(countState);
ReducingState<Long> processed = ctx.getPartitionedState(processedState);
count.add(1L);
processed.add(0L);
if (count.get() >= element.getThresholdCount() && processed.get() == 0) {
processed.add(1L);
return TriggerResult.FIRE_AND_PURGE;
}
if (timestamp >= window.getEnd()) { 
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countState).clear();
ctx.getPartitionedState(processedState).clear();
}

@Override
public boolean canMerge() {
return true;
}
class Sum implements ReduceFunction<java.lang.Long> {
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}

早些时候,当我使用

dataStream
.timeWindow(Time.seconds(1))
.trigger(new CountTriggerWithTimeout<TimeWindow>())

一切都很顺利。由于需要从元素中读取窗口时间,所以我开始使用EventTimeSessionWindow,并在触发器中添加了canMerge((函数。从那时起,一切都不起作用。clear((从未被调用,onProcessingTime((&onEventTime((。我看到时间戳总是被设置为相同的值,而与元素何时被接收无关。

我的要求是";火灾&"清除";当计数>=事件内的阈值。getThresholdInterval((。如果计数<事件中的阈值。getThresholdInterval((然后清除,即调用clear清除计数和状态并重新启动。有没有办法用timeWindow而不是EventTimeSessionWindows来实现这一点?

请帮我解决这个问题。

谢谢。。。

为什么不使用20秒的简单翻滚窗口并计算其上的元素:

source
.keyBy("id")
.timeWindow(Time.seconds(20))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>() {
@Override
public void process(Tuple key, ProcessWindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow>.Context ctx,
Iterable<Tuple2<String, Integer>> in, Collector<String> out) throws Exception {
if (Lists.newArrayList(in).size() >= 2) {
out.collect("Two or more elements between "
+ Instant.ofEpochMilli(ctx.window().getStart())
+ " " + Instant.ofEpochMilli(ctx.window().getEnd()));
}
}
})

最新更新