我想从无限的数据流中过滤出Flink中的重复项。我知道副本只在一个很小的时间窗口(最多10秒)内出现。我发现了一个很有前途的方法,而且很简单。但这行不通。它使用一个键值数据流,并且只返回每个窗口的第一个消息。这是我的窗口代码:
DataStream<Row> outputStream = inputStream
.keyBy(new MyKeySelector())
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.minutes(5)))
.process(new DuplicateFilter());
MyKeySelector()
只是一个选择Row
消息的前两个属性作为密钥的类。此键作为主键工作,并导致只有具有相同键的消息才被分配到相同的窗口(经典键控流行为)。
这就是类Duplicate Filter
,它与上述问题的建议答案非常相似。我只使用较新的process()
函数而不是apply()
。
public class DuplicateFilter extends ProcessWindowFunction<Row, Row, Tuple2<String, String>, TimeWindow> {
private static final Logger LOG = LoggerFactory.getLogger(DuplicateFilter.class);
@Override
public void process(Tuple2<String, String> key, Context context, Iterable<Row> iterable, Collector<Row> collector) throws Exception {
// this is just for debugging and can be ignored
int count = 0;
for (Row record :
iterable) {
LOG.info("Row number {}: {}", count, record);
count++;
}
LOG.info("first Row: {}", iterable.iterator().next());
collector.collect(iterable.iterator().next()); //output only the first message in this window
}
}
我的消息到达的时间间隔为max。1秒,所以30秒的窗口应该能处理好。但是距离小于1秒的消息被分配到不同的窗口。我从日志中看到的是,它很少能正常工作。
有人有这个任务的想法或另一种方法吗?如果您需要更多的信息,请告诉我。
Flink的时间窗口与时钟对齐,而不是与事件对齐,因此两个在时间上接近的事件可以分配给不同的窗口。Windows通常不太适合重复数据删除,但如果使用会话窗口,可能会得到很好的结果。
就我个人而言,我会使用键的平面映射(或进程函数),并使用状态TTL(或定时器)在不再需要时清除键的状态。
您也可以使用Flink SQL: https://ci.apache.org/projects/flink/flink-docs-stable/docs/dev/table/sql/queries/deduplication/进行重复数据删除(但您需要设置空闲状态保留间隔)。