我有一个参数p
,它是只读的,就像机器学习模型一样。假设我可以使用分布式缓存来缓存每个任务管理器上的文件,这样每个任务都可以在本地加载它。
如果map(new MyMapFunction(p))
、p
将被序列化并反序列化到每个运算符,并且如果被缓存并加载,则每个任务将加载p
的实例。假设我有4个任务管理器,每个任务管理器有8个插槽,我们可以使用flink run -p 32
来使用所有资源,而p
将有32个实例。
理论上,p
可以做到这一点,我想只有4个实例,每个线程都可以在同一个任务管理器中使用该实例?它能在Flink工作吗?
我使用以下方法初始化每个TM的公共结构:
class EventProcess extends ProcessFunction[Event, Event] {
...
override def open(parameters: Configuration): Unit = {
super.open(parameters)
EventProcess.init()
}
...
}
object EventProcess {
val lock = "1"
var data: Any = _
def init(config: Config): Unit = {
lock.synchronized {
if (data == null) {
// do init
}
}
}
}
在您的情况下,如果您需要在open()
中从RuntimeCOntext
获取smth并用它初始化对象var
,则可以在open()
:中使用同步
override def open(parameters: Configuration): Unit = {
super.open(parameters)
EventProcess.lock.synchronized {
if (EventProcess.YOUR_VAR == null) {
EventProcess.init(getRuntimeContext()...)
}
}
}
使用静态变量在不同实例之间共享大量只读数据结构是完全可行的。确保使用某种锁以避免重新初始化并确保可见性。
class MyMap extends RichMapFunction {
private static Model model;
public void open() {
if (model == null) {
synchronized (MyMap.class) {
if (model == null) {
model = // read model ...
}
}
}
}
}