在主进程和子进程之间共享导入后更改的变量



我在gRPC中有一个项目,其中main.py生成gRPC服务器作为子流程。

同样在这个项目中,我有settings.py,它包含一些配置,比如:

some_config = {"foo": "bar"}

在一些文件(由不同的进程使用)中,我有:

import settings
...
the value of settings.some_config is read

在主流程中,我有一个监听器,它根据需要更新some_config,例如:

settings.some_config = new_value

我注意到,在主流程中更改settings.some_config值时,它在我检查的子流程中没有更改,而是保持原来的值。

我希望所有子流程始终具有settings.some_config的最新值。

我考虑过的一个解决方案是向每个子流程传递一个队列或管道,当主流程中的some_config发生变化时,我可以通过队列/管道将新数据发送到每个子流程。

但是,我如何提醒它在子流程中将新值分配给settings.some_config?我是否应该在每个子流程中使用一个监听器,以便在通知到达时执行以下操作:

settings.some_config = new_value

这行得通吗?最终目标是使settings.some_config值在所有模块/进程中都是最新的,而无需重新启动服务器。我也不确定它是否能工作,因为每个模块都可能保留以前在其缓存内存中导入的settings.some_config的值。


更新

我采用了Charchit的解决方案,并根据我的要求进行了调整,因此我们有:

from multiprocessing.managers import BaseManager, NamespaceProxy
from multiprocessing import Process
import settings
import time
def get_settings():
return settings
def run(proxy_settings):
settings = proxy_settings # So the module settings becomes the proxy object
if __name__ == '__main__':
BaseManager.register('get_settings', get_settings, proxytype=NamespaceProxy)
manager = BaseManager()
manager.start()
settings = manager.get_settings()
p = Process(target=run, args=(settings, ))
p.start()

几个问题:

是否应该将整个模块(settings)作为代理对象的目标?这样做标准吗?

这里有很多神奇之处,例如,简单的答案是,现在模块设置是一个共享代理对象吗?所以,当一个子进程读取settings.some_config时,它实际上会从管理器读取值?

有什么副作用我应该注意吗?

当我在主进程中更改设置中的任何值时,我应该使用锁吗?

最简单的方法是与管理器共享模块:

from multiprocessing.managers import BaseManager, NamespaceProxy
from multiprocessing import Process
import settings
import time
def get_settings():
return settings
def run(settings):
for _ in range(2):
print("Inside subprocess, the value is", settings.some_config)
time.sleep(3)
if __name__ == '__main__':
BaseManager.register('get_settings', get_settings, proxytype=NamespaceProxy)
manager = BaseManager()
manager.start()
settings = manager.get_settings()
p = Process(target=run, args=(settings, ))
p.start()
time.sleep(1)
settings.some_config = {'changed': 'value'}
p.join()

这样做意味着您不必通知子流程值发生了更改,他们只需知道,因为他们正在从自动处理此更改的管理器流程接收值。

输出

Inside subprocess, the value is {'foo': 'bar'}
Inside subprocess, the value is {'changed': 'value'}

需要记住的一些事情

首先请记住,settings.some_config需要明确设置。这意味着您可以执行settings.some_config = {},但不能执行settings.some_config['foo'] = "bar"。如果你想修改一个密钥,那么获取最新的配置,更新它,并明确设置如下:

temp = settings.some_config
temp['foo'] = 'bar'
settings.some_config = temp

其次,为了将对代码库的可能更改保持在最低限度,您将settings变量(最初映射到settings.py模块对象)重新分配给代理。在上面的代码中,您在__main__块中执行此操作(因此设置正在全局更改)。因此,主进程对settings所做的任何更改都将自动反映在访问代理的其他进程中。这也被部分复制到运行函数run的子进程中。从run内部访问settings与访问代理相同。但是,如果您正在调用run内部的某个其他函数(比如run2),它不将settings作为参数,并且它试图访问settings,那么它将访问导入的模块而不是代理。示例:

def run2():
print("Inside subprocess run2, the value is", settings.some_config)
def run(settings):
for _ in range(2):
print("Inside subprocess run, the value is", settings.some_config)
time.sleep(3)
run2()

输出

Inside subprocess run, the value is {'foo': 'bar'}
Inside subprocess run, the value is {'changed': 'value'}
Inside subprocess run2, the value is {'foo': 'bar'}

如果您不希望这样,那么只需将参数指定为全局变量settings:的值

def run2():
print("Inside subprocess run2, the value is", settings.some_config)
def run(shared_settings):
global settings
settings = shared_settings
for _ in range(2):
print("Inside subprocess run, the value is", settings.some_config)
time.sleep(3)
run2()

现在访问settings的任何函数(子进程内)都将访问代理。

输出

Inside subprocess run, the value is {'foo': 'bar'}
Inside subprocess run, the value is {'changed': 'value'}
Inside subprocess run2, the value is {'changed': 'value'}

最后,如果有许多子流程在运行,则速度可能会变慢(与管理器的连接越多,速度就越慢)。如果这让你感到困扰,那么我建议你按照你在描述中所说的方式来做——即";将队列或管道传递到每个子进程";。为了确保子进程在您将值传递到队列中后尽快更新其值,您可以在子进程中生成一个线程,该线程不断轮询队列中是否存在值,如果存在,则会将进程的设置值更新为队列中提供的值。只需确保将线程作为守护进程运行,或者明确同意退出条件即可。

更新

是否应该将整个模块(设置)作为代理对象的目标?这样做标准吗?

如果你的问题是这样做是否安全,那么是的,请记住我在这个答案中概述的内容。归根结底,模块只是另一个对象,在这里共享它更有意义。

这里有很多神奇之处,例如,对于它的工作原理,简单的答案是现在模块设置是一个共享代理对象吗?所以,当一个子进程读取settings.some_config时,它实际上会从管理器读取值?

您需要在run函数中添加几行。如果是这种情况,请检查上一节中的第二点。

有什么副作用我应该注意吗?

查看上一节。

当我在主进程中更改设置中的任何值时,我应该使用锁吗?

此处无需

Charchit创建专用托管对象的解决方案比实际情况更复杂。如果假设配置存储为字典,那么只需使用multiprocessor.Manager().dict方法返回的'multiprocessing.managers.DictProxy'实例。这也允许您更新单个密钥,而不必通过设置一个全新的字典值来进行更新:

from multiprocessing import Process, Manager
import time
def get_settings(manager):
return manager.dict({'foo': 'bar', 'x': 17})
def run(settings):
for _ in range(2):
print("Inside subprocess, the value is", settings)
time.sleep(3)
if __name__ == '__main__':
manager = Manager()
settings = get_settings(manager)
p = Process(target=run, args=(settings, ))
p.start()
time.sleep(1)
settings['foo'] = 'changed bar'
p.join()

打印:

Inside subprocess, the value is {'foo': 'bar', 'x': 17}
Inside subprocess, the value is {'foo': 'changed bar', 'x': 17}

最新更新