我最近正在研究Flink的新版本中的ProcessWindowFunction
。它说ProcessWindowFunction
支持全球状态和窗口状态。我使用Scala API尝试一下。到目前为止,我可以使全球状态正常工作,但是我没有任何运气可以在窗口状态下做到这一点。我正在做的是处理系统日志并计算由主机名和严重性级别键入的日志数。我想计算两个相邻窗口之间的日志计数差异。这是我实施ProcessWindowFunction
的代码。
class LogProcWindowFunction extends ProcessWindowFunction[LogEvent, LogEvent, Tuple, TimeWindow] {
// Create a descriptor for ValueState
private final val valueStateWindowDesc = new ValueStateDescriptor[Long](
"windowCounters",
createTypeInformation[Long])
private final val reducingStateGlobalDesc = new ReducingStateDescriptor[Long](
"globalCounters",
new SumReduceFunction(),
createTypeInformation[Long])
override def process(key: Tuple, context: Context, elements: Iterable[LogEvent], out: Collector[LogEvent]): Unit = {
// Initialize the per-key and per-window ValueState
val valueWindowState = context.windowState.getState(valueStateWindowDesc)
val reducingGlobalState = context.globalState.getReducingState(reducingStateGlobalDesc)
val latestWindowCount = valueWindowState.value()
println(s"lastWindowCount: $latestWindowCount ......")
val latestGlobalCount = if (reducingGlobalState.get() == null) 0L else reducingGlobalState.get()
// Compute the necessary statistics and determine if we should launch an alarm
val eventCount = elements.size
// Update the related state
valueWindowState.update(eventCount.toLong)
reducingGlobalState.add(eventCount.toLong)
for (elem <- elements) {
out.collect(elem)
}
}
}
我总是从窗口状态获得0
值,而不是以前的更新计数。我一直在解决这样的问题几天。有人可以帮我弄清楚吗?谢谢。
每个窗口状态的范围是一个窗口实例。对于上面的process
方法,每次称为新窗口时都在范围中,因此最新的WindowCount始终为零。
对于只有一次只能开火的普通香草窗口,每扇窗户是没有用的。只有窗口以某种方式有多个射击(例如,晚点),您才能充分利用每窗口状态。如果您想记住从一个窗口到另一个窗口的内容,则可以使用全局窗口状态进行此操作。
有关使用每次窗口状态记住要在末日发射中使用的数据的示例,请参见Flink的高级窗户培训中的幻灯片13-19。