我用 BroadcastProcessFunction 实现了一个 flink 流。从processBroadcastElement中,我得到了我的模型,并将其应用于processElement中的事件。
我没有找到对我的流进行单元测试的方法,因为我找不到确保在第一个事件之前调度模型的解决方案。我想说有两种方法可以实现这一点:
1. 找到一个解决方案,首先
将模型推送到流中 2. 用模型填充广播状态,以便恢复流的执行
我可能错过了一些东西,但我还没有找到一个简单的方法来做到这一点。
这是我的问题的简单单元测试:
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import org.scalatest.Matchers._
import org.scalatest.{BeforeAndAfter, FunSuite}
import scala.collection.mutable
class BroadCastProcessor extends BroadcastProcessFunction[Int, (Int, String), String] {
import BroadCastProcessor._
override def processElement(value: Int,
ctx: BroadcastProcessFunction[Int, (Int, String), String]#ReadOnlyContext,
out: Collector[String]): Unit = {
val broadcastState = ctx.getBroadcastState(broadcastStateDescriptor)
if (broadcastState.contains(value)) {
out.collect(broadcastState.get(value))
}
}
override def processBroadcastElement(value: (Int, String),
ctx: BroadcastProcessFunction[Int, (Int, String), String]#Context,
out: Collector[String]): Unit = {
ctx.getBroadcastState(broadcastStateDescriptor).put(value._1, value._2)
}
}
object BroadCastProcessor {
val broadcastStateDescriptor: MapStateDescriptor[Int, String] = new MapStateDescriptor[Int, String]("int_to_string", classOf[Int], classOf[String])
}
class CollectSink extends SinkFunction[String] {
import CollectSink._
override def invoke(value: String): Unit = {
values += value
}
}
object CollectSink { // must be static
val values: mutable.MutableList[String] = mutable.MutableList[String]()
}
class BroadCastProcessTest extends FunSuite with BeforeAndAfter {
before {
CollectSink.values.clear()
}
test("add_elem_to_broadcast_and_process_should_apply_broadcast_rule") {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val dataToProcessStream = env.fromElements(1)
val ruleToBroadcastStream = env.fromElements(1 -> "1", 2 -> "2", 3 -> "3")
val broadcastStream = ruleToBroadcastStream.broadcast(BroadCastProcessor.broadcastStateDescriptor)
dataToProcessStream
.connect(broadcastStream)
.process(new BroadCastProcessor)
.addSink(new CollectSink())
// execute
env.execute()
CollectSink.values should contain("1")
}
}
感谢大卫安德森
的更新我选择了缓冲溶液。我为同步定义了一个进程函数:
class SynchronizeModelAndEvent(modelNumberToWaitFor: Int) extends CoProcessFunction[Int, (Int, String), Int] {
val eventBuffer: mutable.MutableList[Int] = mutable.MutableList[Int]()
var modelEventsNumber = 0
override def processElement1(value: Int, ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
if (modelEventsNumber < modelNumberToWaitFor) {
eventBuffer += value
return
}
out.collect(value)
}
override def processElement2(value: (Int, String), ctx: CoProcessFunction[Int, (Int, String), Int]#Context, out: Collector[Int]): Unit = {
modelEventsNumber += 1
if (modelEventsNumber >= modelNumberToWaitFor) {
eventBuffer.foreach(event => out.collect(event))
}
}
}
所以我需要将其添加到我的流中:
dataToProcessStream
.connect(ruleToBroadcastStream)
.process(new SynchronizeModelAndEvent(3))
.connect(broadcastStream)
.process(new BroadCastProcessor)
.addSink(new CollectSink())
谢谢
没有一个简单的方法可以做到这一点。你可以让processElement缓冲它的所有输入,直到processBroadcastElement接收到模型。或者在没有事件流量的情况下运行一次作业,并在模型广播后获取保存点。然后将该保存点还原到同一作业中,但已连接其事件输入。
顺便说一下,你正在寻找的能力在 Flink 社区中通常被称为"侧输入"。
感谢David Anderson和Matthieu,我编写了这个通用的CoFlatMap函数,该函数在事件流上发出请求的延迟:
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.util.Collector
import scala.collection.mutable
class SynchronizeEventsWithRules[A,B](rulesToWait: Int) extends CoProcessFunction[A, B, A] {
val eventBuffer: mutable.MutableList[A] = mutable.MutableList[A]()
var processedRules = 0
override def processElement1(value: A, ctx: CoProcessFunction[A, B, A]#Context, out: Collector[A]): Unit = {
if (processedRules < rulesToWait) {
println("1 item buffered")
println(rulesToWait+"--"+processedRules)
eventBuffer += value
return
}
eventBuffer.clear()
println("send input to output without buffering:")
out.collect(value)
}
override def processElement2(value: B, ctx: CoProcessFunction[A, B, A]#Context, out: Collector[A]): Unit = {
processedRules += 1
println("1 rule processed processedRules:"+processedRules)
if (processedRules >= rulesToWait && eventBuffer.length>0) {
println("send buffered data to output")
eventBuffer.foreach(event => out.collect(event))
eventBuffer.clear()
}
}
}
但不幸的是,它在我的情况下根本没有帮助,因为被测试的对象是一个KeyedBroadcastProcessFunction
,这使得事件数据的延迟无关紧要,因此我试图应用一个平面图,使规则流大 n 倍,n 是 CPU 的数量。 所以我将确保生成的事件流将始终与规则流同步,并将在此之后到达但这也无济于事。
毕竟,我得出了这个简单的解决方案,当然它不是确定性的,但由于并行性和并发性的性质,问题本身也不是确定性的。如果我们设置的 delayMilis 足够大 (>100(,结果将是确定性的:
val delayMilis = 100
val synchronizedInput = inputEventStream.map(x=>{
Thread.sleep(delayMilis)
x
}).keyBy(_.someKey)
您还可以将映射函数更改为此函数,以仅对第一个元素应用延迟:
package util
import org.apache.flink.api.common.functions.MapFunction
class DelayEvents[T](delayMilis: Int) extends MapFunction[T,T] {
var delayed = false
override def map(value: T): T = {
if (!delayed) {
delayed=true
Thread.sleep(delayMilis)
}
value
}
}
val delayMilis = 100
val synchronizedInput = inputEventStream.map(new DelayEvents(100)).keyBy(_.someKey)