我有一些昂贵的长期运行函数,我想在多个核心上运行。使用多处理很容易做到这一点。但我还需要定期运行一个函数,该函数根据特定进程的状态(全局变量(计算值。我认为这应该通过简单地在子流程上生成一个线程来实现。
下面是一个简化的例子。请建议我如何拨打procces_query_state()
。
import multiprocessing
import time
def process_runner(x: int):
global xx
xx = x
while True:
time.sleep(0.1)
xx += 1 # actually an expensive calculation
def process_query_state() -> int:
y = xx * 2 # actually an expenseive calculation
return y
def main():
processes = {}
for x in range(10):
p = multiprocessing.get_context('spawn').Process(target=process_runner, args=(x,))
p.start()
processes[x] = p
while True:
time.sleep(1)
print(processes[3].process_query_state()) # this doesn't actually work
if __name__ == '__main__':
main()
我看到两个问题:
-
Process
不是RPC
(Remote Procedure Call
(,您不能从主进程执行其他函数process_query_state
。您只能使用queue
向其他进程发送一些信息,但此进程必须定期检查是否有新消息。 -
Process
只能运行一个函数,因此当它收到运行另一个函数的消息时,它会停止一个函数;或者它必须在新的processes
上运行threads
才能同时运行多个函数。
编辑:这可能会带来其他问题-如果两个函数在同一数据上同时工作,那么其中一个可以在另一个使用旧值之前更改值,这可能会产生错误的结果。
我创建了一个使用队列向process_runner
发送消息的示例,它定期检查是否有消息并运行process_query_state
,然后将结果发送回主进程。
主进程等待所选进程的结果——它会阻塞代码——但如果您想处理更多的进程,那么它必须使其更加复杂。
import multiprocessing
import time
def process_query_state():
y = xx * 2 # actually an expenseive calculation
return y
def process_runner(x: int, queue_in, queue_out):
global xx
xx = x
# reverse direction
q_in = queue_out
q_out = queue_in
while True:
time.sleep(0.1)
xx += 1 # actually an expensive calculation
# run other function - it will block main calculations
# but this way it will use correct `xx` (other calculations will not change it)
if not q_in.empty():
if q_in.get() == 'run':
result = process_query_state()
q_out.put(result)
def main():
processes = {}
for x in range(4):
ctx = multiprocessing.get_context('spawn')
q_in = ctx.Queue()
q_out = ctx.Queue()
p = ctx.Process(target=process_runner, args=(x, q_in, q_out))
p.start()
processes[x] = (p, q_in, q_out)
while True:
time.sleep(1)
q_in = processes[3][1]
q_out = processes[3][2]
q_out.put('run')
# non blocking version
#if not q_in.empty():
# print(q_in.get())
# blocking version
print(q_in.get())
if __name__ == '__main__':
main()