我尝试计算不同窗口大小的流中的数据(窗口大小在 steam 数据中(,所以我使用自定义窗口分配器和聚合函数,但状态很大(窗口范围从一小时到 30 天(
在我看来,聚合状态只是存储中间结果
有什么问题吗?
public class ElementProcessingTime extends WindowAssigner<Element, TimeWindow> {
@Override public Collection<TimeWindow> assignWindows(Element element, long timestamp, WindowAssignerContext context) {
long slide = Time.seconds(10).toMilliseconds();
long size = element.getTime() * 60 * 1000;
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, 0, slide);
for (long start = lastStart; start > timestamp - size; start -= slide) {
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
@Override public Trigger<FactorCalDetail, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ElementTimeTrigger.create();
}
@Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override public boolean isEventTime() {
return false;
}
}
public class CountAggregate implements AggregateFunction<FactorCalDetail, AggregateResult, AggregateResult> {
@Override public AggregateResult createAccumulator() {
AggregateResult result = new AggregateResult();
result.setResult(0.0);
return result;
}
@Override public AggregateResult add(FactorCalDetail value, AggregateResult accumulator) {
accumulator.setKey(value.getGroupKey());
accumulator.addResult();
accumulator.setTimeSpan(value.getTimeSpan());
return accumulator;
}
@Override public AggregateResult getResult(AggregateResult accumulator) {
return accumulator;
}
@Override public AggregateResult merge(AggregateResult a, AggregateResult b) {
if (a.getKey().equals(b.getKey())) {
a.setResult(a.getResult() + b.getResult());
}
return a;
}
}
env.addSource(source)
.keyBy(Element::getKey)
.window(new ElementProcessingTime())
.aggregate(new CountAggregate())
.addSink(new RedisCustomizeSink(redisProperties));
分配自定义窗口时,状态大小可能很快就会失控。这主要是因为每个窗口都需要保存属于其中的所有记录,直到窗口被聚合并最终逐出。在您的代码中,似乎您也为每个记录创建了大量窗口。
您没有指定您的用例,但我假设您实际上想要计算每个键在给定时间点以 10 毫秒的 bin 大小延伸的事件数。如果是这样,那么这不是Windows的直接用例。
您要做的是:
- 将活动拆分为较小的活动。
- 按键和箱分组。
- 数一数你的垃圾箱。
代码中的粗略草图:
input.flatMap(element -> {
...
for (long start = lastStart; start > timestamp - size; start -= slide) {
emit(new KeyTime(key, start));
}
})
.keyBy(keyTime -> keyTime)
.count()
您可以在keyBy
后应用 Windows 来强制某些输出属性,例如等待几分钟,然后输出所有内容并忽略延迟事件。
注意:KeyTime是一个简单的POJO,持有密钥和垃圾箱时间。
编辑:在您发表评论后,解决方案实际上要简单得多。
env.addSource(source)
.keyBy(element -> new Tuple2<>(element.getKey(), element.getTime()))
.count()
.addSink(new RedisCustomizeSink(redisProperties));
你没有说源是什么,它会有自己的状态来持续存在。 你也没有说有多少个唯一键。即使每个键的少量状态也会随着唯一键数量的增加而变得巨大。 如果问题最终出现在聚合器状态增长的某个地方,则可以尝试将窗口逻辑拆分为一系列两个窗口,一个用于每小时聚合一次,另一个用于将每小时汇总聚合到所需的时间范围。