我有一个带有3个不同输入(可选(的flink作业,并且每种类型的输入都会发出相同的输出。
input1使用KeyedProcessFunction()
input2使用ProcessWindowFuction()
基本上,作业输入是三个输入和一个输出的联合。我们如何配置flink作业,以便对于单个作业,我可以使用以上两个流程函数。
我想用KeyedProcessFunction()
代替input1
,用ProcessWindowFuction()
代替input2
。
以下代码仅适用于input2
,如果我想使用input1
,我必须在作业配置中使用.process(processFuction())
而不是.process(MyProcessWindowFunction())
,我们如何配置才能在单个作业中使用这两个功能?
fun setupJob(env: StreamExecutionEnvironment) {
val testStream = env.sampleStream()
.keyBy { it.f0 }
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.process(MyProcessWindowFunction())
testStream.map { it.toKafkaMessage() }
.kafkaSink<SampleOutput>() }
}
单个Flink作业可以包含多个管道。例如,
env.fromSource(input1)
.keyBy(...)
.process(new MyKeyedProcessFunction())
.sinkTo(sink1)
env.fromSource(input2)
.keyBy(...)
.window(...)
.process(new MyProcessWindowFunction())
.sinkTo(sink2)
env.execute()