我有一个类型DataStream[(String, somecaseclass)]
的Flink DataStream。我想在Tuple
的第一个字段上分组,即String
并创建ListBuffer[somecaseclass]
。以下是我尝试的:
val emptylistbuffer = new ListBuffer[somecaseclass]
inputstream
.keyBy(0)
.fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}}
,但这为我提供了每行的输出,这意味着如果有10行,仅在第一行中串联第一个输出行,第十行使我在十行上有一个串联的输出。但是,我只想第十排。我检查了Flink DataStream
上的几乎所有转换,但不适合用例。
输入:
(filename1.dat,somecaseclass("abc","1",2))
(filename1.dat,somecaseclass("dse","2",3))
(filename1.dat,somecaseclass("daa","1",4))
预期输出:
(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4)))
dataStream API认为DataStream
是无限的。这意味着DataStream
可能会提供无限数量的记录。因此,在收到所有记录后,不可能"仅"仅"发出聚合结果(在您的情况下为完整的ListBuffer
),因为可能需要更多的记录来汇总(添加到ListBuffer
中)。原则上,DataStream
上的汇总永远无法产生最终结果,因为可能会有更多记录。由于这不是很实用,因此Flink的DataStream API为每个传入记录产生一个新的结果。
在无界流上计算聚集体的一种常见方法是Windows。Windows定义了可以计算聚集体并发出最终结果的流上的有界部分。Flink根据时间或记录计数提供内置窗口。例如,您的记录收集功能在1小时的翻滚窗口上,将收集一小时内到达的所有记录。
请检查flink文档的不同窗口类型以及如何使用它们。