我在gRPC中有一个项目,其中main.py
生成gRPC服务器作为子流程。
同样在这个项目中,我有settings.py
,它包含一些配置,比如:
some_config = {"foo": "bar"}
在一些文件(由不同的进程使用)中,我有:
import settings
...
the value of settings.some_config is read
在主流程中,我有一个监听器,它根据需要更新some_config
,例如:
settings.some_config = new_value
我注意到,在主流程中更改settings.some_config
值时,它在我检查的子流程中没有更改,而是保持原来的值。
我希望所有子流程始终具有settings.some_config
的最新值。
我考虑过的一个解决方案是向每个子流程传递一个队列或管道,当主流程中的some_config
发生变化时,我可以通过队列/管道将新数据发送到每个子流程。
但是,我如何提醒它在子流程中将新值分配给settings.some_config
?我是否应该在每个子流程中使用一个监听器,以便在通知到达时执行以下操作:
settings.some_config = new_value
这行得通吗?最终目标是使settings.some_config
值在所有模块/进程中都是最新的,而无需重新启动服务器。我也不确定它是否能工作,因为每个模块都可能保留以前在其缓存内存中导入的settings.some_config
的值。
更新
我采用了Charchit的解决方案,并根据我的要求进行了调整,因此我们有:
from multiprocessing.managers import BaseManager, NamespaceProxy
from multiprocessing import Process
import settings
import time
def get_settings():
return settings
def run(proxy_settings):
settings = proxy_settings # So the module settings becomes the proxy object
if __name__ == '__main__':
BaseManager.register('get_settings', get_settings, proxytype=NamespaceProxy)
manager = BaseManager()
manager.start()
settings = manager.get_settings()
p = Process(target=run, args=(settings, ))
p.start()
几个问题:
是否应该将整个模块(settings
)作为代理对象的目标?这样做标准吗?
这里有很多神奇之处,例如,简单的答案是,现在模块设置是一个共享代理对象吗?所以,当一个子进程读取settings.some_config
时,它实际上会从管理器读取值?
有什么副作用我应该注意吗?
当我在主进程中更改设置中的任何值时,我应该使用锁吗?
最简单的方法是与管理器共享模块:
from multiprocessing.managers import BaseManager, NamespaceProxy
from multiprocessing import Process
import settings
import time
def get_settings():
return settings
def run(settings):
for _ in range(2):
print("Inside subprocess, the value is", settings.some_config)
time.sleep(3)
if __name__ == '__main__':
BaseManager.register('get_settings', get_settings, proxytype=NamespaceProxy)
manager = BaseManager()
manager.start()
settings = manager.get_settings()
p = Process(target=run, args=(settings, ))
p.start()
time.sleep(1)
settings.some_config = {'changed': 'value'}
p.join()
这样做意味着您不必通知子流程值发生了更改,他们只需知道,因为他们正在从自动处理此更改的管理器流程接收值。
输出
Inside subprocess, the value is {'foo': 'bar'}
Inside subprocess, the value is {'changed': 'value'}
需要记住的一些事情
首先请记住,settings.some_config
需要明确设置。这意味着您可以执行settings.some_config = {}
,但不能执行settings.some_config['foo'] = "bar"
。如果你想修改一个密钥,那么获取最新的配置,更新它,并明确设置如下:
temp = settings.some_config
temp['foo'] = 'bar'
settings.some_config = temp
其次,为了将对代码库的可能更改保持在最低限度,您将settings
变量(最初映射到settings.py
模块对象)重新分配给代理。在上面的代码中,您在__main__
块中执行此操作(因此设置正在全局更改)。因此,主进程对settings
所做的任何更改都将自动反映在访问代理的其他进程中。这也被部分复制到运行函数run
的子进程中。从run
内部访问settings
与访问代理相同。但是,如果您正在调用run
内部的某个其他函数(比如run2
),它不将settings
作为参数,并且它试图访问settings
,那么它将访问导入的模块而不是代理。示例:
def run2():
print("Inside subprocess run2, the value is", settings.some_config)
def run(settings):
for _ in range(2):
print("Inside subprocess run, the value is", settings.some_config)
time.sleep(3)
run2()
输出
Inside subprocess run, the value is {'foo': 'bar'}
Inside subprocess run, the value is {'changed': 'value'}
Inside subprocess run2, the value is {'foo': 'bar'}
如果您不希望这样,那么只需将参数指定为全局变量settings
:的值
def run2():
print("Inside subprocess run2, the value is", settings.some_config)
def run(shared_settings):
global settings
settings = shared_settings
for _ in range(2):
print("Inside subprocess run, the value is", settings.some_config)
time.sleep(3)
run2()
现在访问settings
的任何函数(子进程内)都将访问代理。
输出
Inside subprocess run, the value is {'foo': 'bar'}
Inside subprocess run, the value is {'changed': 'value'}
Inside subprocess run2, the value is {'changed': 'value'}
最后,如果有许多子流程在运行,则速度可能会变慢(与管理器的连接越多,速度就越慢)。如果这让你感到困扰,那么我建议你按照你在描述中所说的方式来做——即";将队列或管道传递到每个子进程";。为了确保子进程在您将值传递到队列中后尽快更新其值,您可以在子进程中生成一个线程,该线程不断轮询队列中是否存在值,如果存在,则会将进程的设置值更新为队列中提供的值。只需确保将线程作为守护进程运行,或者明确同意退出条件即可。
更新
是否应该将整个模块(设置)作为代理对象的目标?这样做标准吗?
如果你的问题是这样做是否安全,那么是的,请记住我在这个答案中概述的内容。归根结底,模块只是另一个对象,在这里共享它更有意义。
这里有很多神奇之处,例如,对于它的工作原理,简单的答案是现在模块设置是一个共享代理对象吗?所以,当一个子进程读取settings.some_config时,它实际上会从管理器读取值?
您需要在run
函数中添加几行。如果是这种情况,请检查上一节中的第二点。
有什么副作用我应该注意吗?
查看上一节。
当我在主进程中更改设置中的任何值时,我应该使用锁吗?
此处无需
Charchit创建专用托管对象的解决方案比实际情况更复杂。如果假设配置存储为字典,那么只需使用multiprocessor.Manager().dict
方法返回的'multiprocessing.managers.DictProxy'
实例。这也允许您更新单个密钥,而不必通过设置一个全新的字典值来进行更新:
from multiprocessing import Process, Manager
import time
def get_settings(manager):
return manager.dict({'foo': 'bar', 'x': 17})
def run(settings):
for _ in range(2):
print("Inside subprocess, the value is", settings)
time.sleep(3)
if __name__ == '__main__':
manager = Manager()
settings = get_settings(manager)
p = Process(target=run, args=(settings, ))
p.start()
time.sleep(1)
settings['foo'] = 'changed bar'
p.join()
打印:
Inside subprocess, the value is {'foo': 'bar', 'x': 17}
Inside subprocess, the value is {'foo': 'changed bar', 'x': 17}