键和 Window 实例在 KeyedStream#timeWindow#process 中是什么关系



对于KeyedStream#timeWindow#process,我想知道一个窗口实例是否只包含相同的键,而不同的键将使用不同的窗口实例。

从以下应用程序的输出中,我看到一个窗口实例将只包含相同的键,而不同的键将使用不同的窗口。

但我想问并确认,谢谢!

import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import scala.util.Random
class KeyByAndWindowAndProcessTestSource extends RichParallelSourceFunction[Int] {
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
while (true) {
val i = new Random().nextInt(30)
ctx.collect(i)
ctx.collect(i)
ctx.collect(i)
Thread.sleep(1000)
}
}
override def cancel(): Unit = {
}
}

应用程序是:

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._

object KeyByAndWindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val ds: DataStream[Int] = env.addSource(new KeyByAndWindowAndProcessTestSource)
val ds2 = ds.keyBy(i => i).timeWindow(Time.seconds(4)).process(new MyProcessFunction())
ds2.print()
env.execute()
}
}

class MyProcessFunction extends ProcessWindowFunction[Int, String, Int, TimeWindow] {

override def process(
key: Int,
ctx: Context,
vals: Iterable[Int],
out: Collector[String]): Unit = {
println(new java.util.Date())
println(s"key=${key}, vals = ${vals.mkString(",")}, hashCode=${System.identityHashCode(ctx.window)}")
}
}

输出为:

Sat Sep 14 13:08:24 CST 2019
key=26, vals = 26,26,26, hashCode=838523304
Sat Sep 14 13:08:24 CST 2019
key=28, vals = 28,28,28, hashCode=472721641
Sat Sep 14 13:08:24 CST 2019
key=18, vals = 18,18,18,18,18,18, hashCode=1668151956

实际上,对于 ProcessingTimeWindow,为每个元素创建了一个新的窗口对象。

以下是TumblingProcessingTimeWindows#assignWindows的源代码:

public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}

因此System.identityHashCode将始终为不同的键返回唯一的哈希代码,并且您的测试代码并不能证明任何事情。

在引擎盖下,元素按elementKey + assignedWindow键分组,所以我认为说"一个窗口实例只包含相同的键,不同的键将使用不同的窗口实例">是正确的。

原始答案:

我希望我能回答你的问题...

将为每个窗口和键调用一次ProcessWindowFunction#process(或多次,具体取决于窗口的触发器(。在内部,窗口和键构成复合分区键。

就 Java 对象实例而言,ProcessWindowFunction的一个实例将处理许多键。具体来说,将有许多 ProcessWindow 函数的并行度。

跟进:

所以我没有做对:)

对于由WindowOperator处理的每条记录,将创建一个新的Window对象,并具有正确的记录开始/结束时间。

这意味着每次调用ProcessWindowFunction#process都将传递一个新的Window对象。

重要的是要理解,Flink 中的Window是一个非常轻的对象,它只是用作整个键的附加部分(namespace(。它不包含任何数据和/或逻辑。

我可以问一下问题的背景吗?

相关内容

  • 没有找到相关文章

最新更新