在flink中定义时间窗口后,如下所示:
val lines = socket.timeWindowAll(Time.seconds(5))
如何计算 5 秒特定窗口中的记录数?
执行计数聚合的最有效方法是ReduceFunction
。但是,reduce
有输入和输出类型必须相同的限制。因此,在应用窗口之前,您必须将输入转换为Int
:
val socket: DataStream[(String)] = ???
val cnts: DataStream[Int] = socket
.map(_ => 1) // convert to 1
.timeWindowAll(Time.seconds(5)) // group into 5 second windows
.reduce( (x, y) => x + y) // sum 1s to count
你可以试试这个。可能会给你解决方案。
val text = senv.socketTextStream("localhost", 9999)
val counts = text.map {(m: String) => (m.split(",")(0), 1) }
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1)
counts.print
senv.execute("ProcessingTime processing example")