我正在为测试用例编写一些函数,这些函数被注入了一些对对象的共享引用;假设这样的SinkFunction
:
class Collector[T](collection: ListBuffer[T]) extends SinkFunction[T] {
override def invoke(in: T, context: SinkFunction.Context[_]): Unit = {
collection.append(in)
}
}
和测试代码:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val list = ListBuffer.empty[String]
env.fromElements("Hello").addSink(new Collector(list))
env.execute()
println(list)
我运行了测试,但在测试结束时list
是空的! 我检查了文档 (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html( 我发现原始示例使用的是单例引用。
所以我要求确定 Apache Flink 在内部是如何工作的:它是否序列化了所有添加到流中的函数,即使在本地部署中也是如此?
是的,Flink 序列化所有函数。例如,如果你看一下SinkFunction,你会注意到它implements Serializable
。
如果你想在作业和客户端(将作业发送到 Flink 的程序(之间共享数据,你必须使用文件、套接字、消息传递(RMQ、Kafka(或一些类似的机制自己管理它。