Flink不同的任务是否可以在同一个任务管理器中共享相同的变量



我有一个参数p,它是只读的,就像机器学习模型一样。假设我可以使用分布式缓存来缓存每个任务管理器上的文件,这样每个任务都可以在本地加载它。

如果map(new MyMapFunction(p))p将被序列化并反序列化到每个运算符,并且如果被缓存并加载,则每个任务将加载p的实例。假设我有4个任务管理器,每个任务管理器有8个插槽,我们可以使用flink run -p 32来使用所有资源,而p将有32个实例。

理论上,p可以做到这一点,我想只有4个实例,每个线程都可以在同一个任务管理器中使用该实例?它能在Flink工作吗?

我使用以下方法初始化每个TM的公共结构:

class EventProcess extends ProcessFunction[Event, Event] {
...
override def open(parameters: Configuration): Unit = {
super.open(parameters)
EventProcess.init()
}
...
}
object EventProcess {
val lock = "1"
var data: Any = _
def init(config: Config): Unit = {
lock.synchronized {
if (data == null) {
// do init
}
}
}
}

在您的情况下,如果您需要在open()中从RuntimeCOntext获取smth并用它初始化对象var,则可以在open():中使用同步


override def open(parameters: Configuration): Unit = {
super.open(parameters)
EventProcess.lock.synchronized {
if (EventProcess.YOUR_VAR == null) {
EventProcess.init(getRuntimeContext()...)
}
}
}

使用静态变量在不同实例之间共享大量只读数据结构是完全可行的。确保使用某种锁以避免重新初始化并确保可见性。

class MyMap extends RichMapFunction {
private static Model model;
public void open() {
if (model == null) {
synchronized (MyMap.class) {                
if (model == null) {
model = // read model ...
}
}
}
}
}

相关内容

  • 没有找到相关文章

最新更新