使用Flink窗口和折叠功能,元素丢失



当我尝试使用window和fold函数聚合元素时在聚合过程中遗漏了个元素。消耗元件从Kafka(value:0, value:1, value:2, value:3)并聚合它们作为奇值和偶值。

输出为:

{even=[0, 2, 4], odd=[1, 3]}
{even=[6, 8], odd=[5, 7, 9]}
{even=[14, 16, 18], odd=[15, 17]}
{even=[20, 22], odd=[19, 21, 23]}
{even=[24, 26, 28], odd=[25, 27]}

缺少10-13之间的数字,这种情况发生在数字。有人能建议一下下面的代码遗漏了什么吗如何确保处理所有元素?

public static class Splitter implements FlatMapFunction<String, 
Tuple3<String, String, List<String>>{
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple3<String, String, 
List<String>>out) throws Exception {
String[] vals = value.split(":");
if(vals.length 1 && Integer.parseInt(vals[1]) % 2 == 0){
out.collect(new Tuple3<String, String, List<String>>
("test","even", Arrays.asList(vals[1])));
}else{
out.collect(new Tuple3<String, String, List<String>>
("test","odd", Arrays.asList(vals[1])));
}
}
}

DataStream<Map<String, List<String>>streamValue = 
kafkaStream.flatMap(new Splitter()).keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))
.fold(new HashMap<String, List<String>>(), new 
FoldFunction<Tuple3<String, String, List<String>>, Map<String, 
List<String>>>() {
private static final long serialVersionUID = 1L;
@Override
public Map<String, List<String>fold(Map<String, 
List<String>accumulator,
Tuple3<String, String, List<String>value) throws 
Exception {
if(accumulator.get(value.f1) != null){
List<Stringlist = new ArrayList<>();
list.addAll(accumulator.get(value.f1));
list.addAll(value.f2);
accumulator.put(value.f1, list);
}else{
accumulator.put(value.f1, value.f2);
}
return accumulator;
}
});
streamValue.print();
env.execute("window test");
}

public class CustomizedCountTrigger<W extends Windowextends 
Trigger<Object, W{
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<LongstateDesc =
new ReducingStateDescriptor<>("count", new Sum(), 
LongSerializer.INSTANCE);
private CustomizedCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window,
TriggerContext ctx) throws Exception {
ReducingState<Longcount = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(W window, 
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx)
throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public String toString() {
return "CountTrigger(" +  maxCount + ")";
}
public static <W extends WindowCustomizedCountTrigger<Wof(long 
maxCount) {
return new CustomizedCountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long{
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}

因此,在注意到您的自定义触发器使您使用翻滚事件时间窗口的事实变得有些无关紧要之前,我就开始编写本文的第一部分,但无论如何,我都想包含我最初的想法,因为我不完全确定您不使用EventTime窗口时为什么要使用它。意识到这一点后,我的回复低于最初的回复

您是在单个并行或多个并行上运行此操作?我之所以这么问,是因为如果是多重并行(kafka主题也由多个分区组成),那么消息可能是以非顺序接收和处理的。这可能导致带有时间戳的消息导致水印前进,从而导致窗口处理消息。然后,下一个消息具有在当前水印时间之前的事件时间(也称为"延迟"),这将导致消息被丢弃。

例如:如果你有20个元素,每个元素的事件时间是这样的:

消息1:事件时间:1000消息1:事件时间:2000等等。

您的事件时间窗口为5001ms。

现在消息message1到message9按顺序显示。第一个窗口将被处理并包含消息1-5(消息6将导致窗口被处理)。现在,如果message11出现在message10之前,它将导致处理包含消息6-9的窗口。当消息10下一次出现时,水印已经提前超过消息10的事件时间,导致它作为"0"被丢弃;"迟到事件";。

正确答案

不要使用eventTime窗口和自定义触发器,而是尝试使用countWindow。

所以替换这个:

.window(TumblingEventTimeWindows.of(Time.milliseconds(3000))).
trigger(CustomizedCountTrigger.of(5L))//.trigger(CountTrigger.of(2))

有了这个:

.countWindow(5L)

相关内容

  • 没有找到相关文章

最新更新