我想知道Flink的数据流API是否可以用于从传入的记录中删除重复项(可能在特定的时间窗口内(,就像在提供名为"Distinct"的转换的数据集API中一样。或者无论如何,如果数据集可以转换为数据流,假设数据集被转换为Flink中的内部处理数据流。
请帮我一下。提前感谢!干杯
我不知道有任何内置的基元,但如果窗口中的所有数据都适合内存,那么您可以轻松地自己构建这个函数。
DataStream<...> stream = ...
stream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new DistinctFunction<>());
public class DistinctFunction<T, W extends Window> extends ProcessAllWindowFunction<T, T, W> implements Function {
public void process(final Context context, Iterable<T> input, Collector<R> out) throws Exception {
Set<T> elements = new HashSet<>();
input.forEach(elements::add);
elements.forEach(out::collect);
}
}
当然,如果你有一个键,它的可扩展性会大得多,因为只有窗口中一个键的数据需要保存在内存中。