通过键记录并收集到弗林克流中的ListBuffer



我有一个类型DataStream[(String, somecaseclass)]的Flink DataStream。我想在Tuple的第一个字段上分组,即String并创建ListBuffer[somecaseclass]。以下是我尝试的:

val emptylistbuffer = new ListBuffer[somecaseclass]
inputstream
  .keyBy(0)
  .fold(emptylistbuffer){case(outputbuffer,b) => {outputbuffer+=b._2}}

,但这为我提供了每行的输出,这意味着如果有10行,仅在第一行中串联第一个输出行,第十行使我在十行上有一个串联的输出。但是,我只想第十排。我检查了Flink DataStream上的几乎所有转换,但不适合用例。

输入:

(filename1.dat,somecaseclass("abc","1",2))
(filename1.dat,somecaseclass("dse","2",3))
(filename1.dat,somecaseclass("daa","1",4))

预期输出:

(filename.dat,ListBuffer(somecaseclass("abc","1",2),somecaseclass("dse","2",3),somecaseclass("daa","1",4)))

dataStream API认为DataStream是无限的。这意味着DataStream可能会提供无限数量的记录。因此,在收到所有记录后,不可能"仅"仅"发出聚合结果(在您的情况下为完整的ListBuffer),因为可能需要更多的记录来汇总(添加到ListBuffer中)。原则上,DataStream上的汇总永远无法产生最终结果,因为可能会有更多记录。由于这不是很实用,因此Flink的DataStream API为每个传入记录产生一个新的结果。

在无界流上计算聚集体的一种常见方法是Windows。Windows定义了可以计算聚集体并发出最终结果的流上的有界部分。Flink根据时间或记录计数提供内置窗口。例如,您的记录收集功能在1小时的翻滚窗口上,将收集一小时内到达的所有记录。

请检查flink文档的不同窗口类型以及如何使用它们。

相关内容

  • 没有找到相关文章

最新更新