在学习过程中,我正在尝试使用Apache Flink编写一个简单的单词计数程序。
问题是,我无法在结果中消除重复的密钥元组。
输入:
a
aaa
ab
aaa
a
a
输出:
(a,1)
(a,2)
(a,3)
(aaa,1)
(aaa,2)
(ab, 1)
预期输出:
(a,3)
(aaa,2)
(ab, 1)
我的代码:
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.readTextFile("data.in");
DataStream<Tuple2<String, Integer>> counts = text
.map(s -> Tuple2.of(s, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0)
.sum(1);
counts.print();
env.execute();
}
Flink的流式API不是为了产生您期望的结果而设计的。相反,流处理背后的思想是,输入可能是无限的——换句话说,输入将永远连续到达。在实践中,是的,输入可能会终止,但也可能不会。
由于Flink并不期望流输入会终止,因此不能期望它等到结束才产生结果。相反,Flink的DataStream API是围绕产生连续结果的连续输入的思想组织的。每个新的输入事件可以产生更新的结果。
然而,有一种方法可以在仍然使用DataStream API的情况下实现您想要的内容,但这有点复杂。
事实证明,当您将Flink与有界输入源(如文件(一起使用时,当到达有界输入的末尾时,会通过作业图发送一个信号,指示已到达末尾。事实上,你可以等待这个信号,然后才能产生结果。
我所说的这个信号实际上是一个水印,其值为MAX_watermark。因此,您可以让ProcessFunction为遥远的将来设置一个事件时间计时器。只有当出现此特殊水印时,此计时器才会启动。与此同时,这个ProcessFunction应该监视流,跟踪(每个键的(最新结果——只有当这个计时器在收到这个超大的水印后最终启动时,它才会收集到输出。
或者您可以只使用Flink的数据集API,它是围绕批处理组织的。然后你会得到你所期望的。