我在流的世界中很新,我在第一次尝试中面临一些问题。
我想做的是在下面的window: WindowdStream
中找到顶部K元素。我试图实现自己的功能,但不确定它是如何工作的。
似乎没有打印任何东西
您可以有任何提示吗?
val parsedStream: DataStream[(String, Response)] = stream
.mapWith(_.decodeOption[Response])
.filter(_.isDefined)
.map { record =>
(
s"${record.get.group.group_country}, ${record.get.group.group_city}",
record.get
)
}
val topLocations = parsedStream
.keyBy(_._1)
.timeWindow(Time.days(7))
.process(new SortByCountFunction)
sortbyCountFunction
class SortByCountFunction
extends ProcessWindowFunction[(String, Response), MeetUpLocationWindow, String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[(String, Response)],
out: Collector[MeetUpLocationWindow]): Unit = {
val count: Map[String, Iterable[(String, Response)]] = elements.groupBy(_._1)
val locAndCount: Seq[MeetUpLocation] = count.toList.map(tmp => {
val location: String = tmp._1
val meetUpList: Iterable[(String, Response)] = tmp._2
MeetUpLocation(location, tmp._2.size, meetUpList.map(_._2).toList)
})
val output: List[MeetUpLocation] = locAndCount.sortBy(tup => tup.count).take(20).toList
val windowEnd = context.window.getEnd
out.collect(MeetUpLocationWindow(windowEnd, output))
}
}
case class MeetUpLocationWindow(endTs: Long, locations: List[MeetUpLocation])
case class MeetUpLocation(location: String, count: Int, meetUps: List[Response])
当您的Flink DataStream作业未能产生任何输出时,通常的嫌疑人是:
- 该作业在StreamExecutionEnvironment(例如
env.execute()
(
上不调用Execute(( - 该作业没有接收器(例如
TopLocations.print()
( - 这项工作是要使用事件时间,但是水印未正确设置,或者闲置的来源阻止了水印
- 这项工作正在写信给任务管理器日志,但没有人注意到
- 输出类型的序列化器不产生输出
没有更多信息,很难猜测在这种情况下可能是哪一个。