如何在Flink上的窗口流中获取每个单词



我想为每个单词带有窗口函数的单词计数:

如果我使用此代码:

object WindowWordCount {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)
    val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
    counts.print
    env.execute("Window Stream WordCount")
  }
}

我在5秒(窗口的时间)之后获得输出:

输入:

first input : hello
seconde input : hello
third input : word
fifth input : hello
sixth input : word

输出

first output : hello : 3 | word : 2

,但我想对每个单词的输出进行计数。

这样:输入:

first input: hello
seconde input:hello
third input:word
fifth input:hello
sixth input:word

out put:

first output: hello : 1
seconde output:hello : 2
third output:word : 1
fifth output:hello : 3
sixth output:word : 2

我该怎么做?

kafka流api的示例程序不是您要寻找的吗?https://ci.apache.org/projects/flink/flink/flink-docs-rease-1.3/dev/datastream_api.html#example-program

object WindowWordCount {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)
    val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1)
    counts.print
    env.execute("Window Stream WordCount")
  }
}

相关内容

  • 没有找到相关文章

最新更新