我想在 Flink 中的每个节点之间共享一个HashMap
,并允许节点更新该 HashMap。到目前为止,我有这段代码:
object ParallelStreams {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Is there a way to attach a HashMap to this config variable?
val config = new Configuration()
config.setClass("HashMap", Class[CustomGlobal])
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
class CustomGlobal extends ExecutionConfig.GlobalJobParameters {
override def toMap: util.Map[String, String] = {
new HashMap[String, String]()
}
}
class MyCoMap extends RichCoMapFunction[String, String, String] {
var users: HashMap[String, String] = null
//How do I get access the HashMap I attach to the global config here?
override def open(parameters: Configuration): Unit = {
super.open(parameters)
val globalParams = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
val globalConf = globalParams[Configuration]
val hashMap = globalConf.getClass
}
//Other functions to override here
}
}
我想知道您是否可以将自定义对象附加到此处创建的config
变量val config = new Configuration()
?(请参阅上面代码中的注释(。
我注意到您只能附加原始值。我创建了一个扩展ExecutionConfig.GlobalJobParameters
的自定义类,并通过执行config.setClass("HashMap", Class[CustomGlobal])
附加该类,但我不确定这是否是您应该这样做的方式?
将参数分发给运算符的常用方法是将它们作为函数类中的常规成员变量。在计划构建期间创建和分配的函数对象将序列化并交付给所有工作人员。因此,您不必通过配置传递参数。
这将如下所示
class MyMapper(map: HashMap) extends MapFunction[String, String] {
// class definition
}
val inStream: DataStream[String] = ???
val myHashMap: HashMap = ???
val myMapper: MyMapper = new MyMapper(myHashMap)
val mappedStream: DataStream[String] = inStream.map(myMapper)
myMapper
对象被序列化(使用 Java 序列化(并交付以供执行。因此,map
的类型必须实现 JavaSerializable
接口。
编辑:我错过了您希望地图可从所有并行任务中更新的部分。这在 Flink 中是不可能的。您必须完全复制映射并全部更新(通过广播(,或者为此使用外部系统(键值存储(。