我想为每个单词带有窗口函数的单词计数:
如果我使用此代码:
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print
env.execute("Window Stream WordCount")
}
}
我在5秒(窗口的时间)之后获得输出:
输入:
first input : hello
seconde input : hello
third input : word
fifth input : hello
sixth input : word
输出
first output : hello : 3 | word : 2
,但我想对每个单词的输出进行计数。
这样:输入:
first input: hello
seconde input:hello
third input:word
fifth input:hello
sixth input:word
out put:
first output: hello : 1
seconde output:hello : 2
third output:word : 1
fifth output:hello : 3
sixth output:word : 2
我该怎么做?
kafka流api的示例程序不是您要寻找的吗?https://ci.apache.org/projects/flink/flink/flink-docs-rease-1.3/dev/datastream_api.html#example-program
object WindowWordCount {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val counts = text.flatMap { _.toLowerCase.split("\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
counts.print
env.execute("Window Stream WordCount")
}
}