我正在创建一个管道,其中的输入是包含时间戳字段的json消息,用于设置eventTime。问题是,一些记录可能会延迟到达或重复到达系统,这种情况需要管理;为了避免重复,我尝试了以下解决方案:
.assignTimestampsAndWatermarks(new RecordWatermark()
.withTimestampAssigner(new ExtractRecordTimestamp()))
.keyBy(new MetricGrouper())
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(3)))
.process(new WindowedFilter())
.keyBy(new MetricGrouper())
.window(TumblingEventTimeWindows.of(Time.seconds(180)))
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(15)))
.process(new WindowedCountDistinct())
.map((value) -> value.toString());
其中,第一个窗口操作是根据保存在集合中的时间戳来过滤记录的,如下所示:
public class WindowedFilter extends ProcessWindowFunction<MetricObject, MetricObject, String, TimeWindow> {
HashSet<Long> previousRecordTimestamps = new HashSet<>();
@Override
public void process(String s, Context context, Iterable<MetricObject> inputs, Collector<MetricObject> out) throws Exception {
String windowStart = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(context.window().getStart()));
String windowEnd = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(context.window().getEnd()));
log.info("window start: '{}', window end: '{}'", windowStart, windowEnd);
Long watermark = context.currentWatermark();
log.info(inputs.toString());
for (MetricObject in : inputs) {
Long recordTimestamp = in.getTimestamp().toEpochMilli();
if (!previousRecordTimestamps.contains(recordTimestamp)) {
log.info("timestamp not contained");
previousRecordTimestamps.add(recordTimestamp);
out.collect(in);
}
}
}
这个解决方案是有效的,但我觉得我没有考虑一些重要的事情,或者可以用更好的方式来做。
使用windows进行重复数据消除的一个潜在问题是,Flink的DataStream API中实现的窗口总是与epoch对齐。这意味着,例如,发生在11:59:59的事件和发生在12:00:01的重复事件将被放置在不同的一分钟长的窗口中。
然而,在您的情况下,您关心的重复项似乎也带有相同的时间戳。在这种情况下,只要你不担心水印会产生后期事件,你所做的事情就会产生正确的结果。
使用windows进行重复数据消除的另一个问题是它们给管道带来的延迟,以及用于最小化延迟的解决方案。
这就是为什么我更喜欢使用RichFlatMapFunction
或KeyedProcessFunction
来实现重复数据消除。像这样的东西会比窗口更好:
private static class Event {
public final String key;
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(new EventSource())
.keyBy(e -> e.key)
.flatMap(new Deduplicate())
.print();
env.execute();
}
public static class Deduplicate extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> seen;
@Override
public void open(Configuration conf) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupFullSnapshot()
.build();
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("seen", Types.BOOLEAN);
desc.enableTimeToLive(ttlConfig);
seen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (seen.value() == null) {
out.collect(event);
seen.update(true);
}
}
}
这里,key
正在对流进行重复数据消除,并且在一分钟后自动清除所涉及的状态。