本文档展示了一个使用Value
和Array
在进程之间共享状态的示例multiprocessing
库中:
从多处理导入 进程、值、数组
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
它将打印
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
我的问题是
如何继续将信息传递给其他进程,而不是在创建工作进程期间传递?
如何通过此机制使工作进程阻止(或挂起(以等待来自父进程的事件?
我的平台是Windows 10。共享内存可以在进程之间共享,但 fork(( 或 spawn(( 进程不能继承信号量、锁、队列等。
谢谢。
[更新1]
@Manu-巴尔德斯给出的演示有效。但是我做了一个例子不起作用,也许你可以帮助发现问题。
%%file ./examples/multiprocessing_pool5.py
# This code definitely will not work in Windows as queue object is not fork() along.
import multiprocessing
import os
def f1(q):
x = q.get(True) # Block until something is in the queue
if x == 55:
raise Exception('I do not like 55!')
elif x == 100:
return
else:
print(f'f1({x}) -> {x*x}')
def f2(q):
x = q.get(True) # Block until something is in the queue
if x == 55:
raise Exception('I do not like 55!')
elif x == 100:
return
else:
print(f'f2({x}) -> {x*x}')
def wp_init(q):
#global queue
#queue = q # Point to the global queue in each process
print(f'I am initialized')
def success_cb(result):
print(f'Success returns = {result}')
def failure_cb(result):
print(f'Failure returns = {result}')
if __name__ == '__main__':
np = os.cpu_count() # Number of cores per CPU
queue = multiprocessing.Queue()
pool = multiprocessing.Pool(np, initializer=wp_init, initargs=(queue,))
for x in range(100):
if x % 2 == 0:
f = f1
else:
f = f2
pool.apply_async(f, args=(queue,), callback=success_cb, error_callback=failure_cb)
for x in range(100):
queue.put(x)
# Terminate them but I do not know how to loop through the processes
for _ in range(100):
queue.put(100) # Terminate it
pool.close()
pool.join()
错误是
I am initialized
I am initialized
I am initialized
I am initialized
Failure returns = Queue objects should only be shared between processes through inheritance
要以线程安全的方式进行通信,您可以使用Queue
.如果队列为空,则get()
方法会阻塞,并等待直到put()
新元素
from multiprocessing import Process, Queue
def f(q):
while True:
element = q.get()
print(element)
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
q.put([42, None, 'hello'])
p.join()
让我回答我自己的问题。以下是我的一些理解:
a(apply_async()
立即返回。我在创建Queue
、Value
和Array
时使用multiprocessing.Manager()
以避免错误Synchronized objects should only be shared between processes through inheritance
或xxx objects should only be shared between processes through inheritance
。
b( 使用multiprocessing.Queue
从其父进程发出信号、停止、终止工作进程。
c( 不可能为等待同一队列的不同工作进程传递不同的消息。请改用不同的队列。
d(Pool.apply_async()
只允许工作进程的主入口函数接受一个参数。在这种情况下,将参数放在一个列表 ([]
(。
e( 我们可以使用multiprocessing.sharedctypes.RawValue()
、multiprocessing.sharedctypes.RawArray()
、multiprocessing.sharedctypes.Value()
和Array
multiprocessing.sharedctypes.Array()
在共享内存中创建 ctypes 值、ctypes 数组、带可选锁的 ctypes 值和带可选锁的 ctypes 数组。使用multiprocessing.Pool()
创建对象时,可以通过initializer
和initargs
关键字参数将可共享对象传递给工作进程Pool
。 无法使用Pool.apply_async()
或Pool.map()
方法传递这些可共享对象。
f( 关于多处理的标准 Python 文档需要更新。例如
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
应该写成class multiprocessing.pool.Pool([processes [, initializer=None [, initargs=None [, maxtaskperchild=None [, context=None]]]]])
import multiprocessing as mp
import time
# Worker process 1
def f1(q):
while True:
x = queue.get(True) # Block until there is message
if x >= 20:
raise Exception(f'f1: I do not like {x}!')
elif x == -1:
print(f'f1: Quit')
return "f1"
else:
time.sleep(0.5)
v = q[0]
a = q[1]
print(f'f1({x}, {v}, {a})')
# Worker process 2
def f2(q):
while True:
x = queue.get(True) # Block until there is message
if x >= 20:
raise Exception(f'f2: I do not like {x}!')
elif x == -1:
print(f'f2: Quit')
return "f2"
else:
time.sleep(0.5)
v = q[0]
a = q[1]
print(f'f1({x}, {v}, {a})')
def pInit(q, poolstr):
'''
Initialize global shared variables among processes.
Could possibly share queue and lock here
'''
global queue
queue = q # Point to the global queue in each process
print(f'{poolstr} is initialized')
def succCB(result):
print(f'Success returns = {result}')
def failCB(result):
print(f'Failure returns = {result}')
if __name__ == '__main__':
# Create shared memory to pass data to worker processes
# lock=True for multiple worker processes on the same queue
v1 = mp.Manager().Value('i', 0, lock=True)
a1 = mp.Manager().Array('i', range(20), lock=True)
# lock=False for 1 worker process on the queue
v2 = mp.Manager().Value('i', 0, lock=False)
a2 = mp.Manager().Array('i', range(20), lock=False)
# Create queues for signaling worker processes
queue1 = mp.Manager().Queue()
queue2 = mp.Manager().Queue()
# Creating pool of processes now - fork here
pool1 = mp.Pool(2, initializer=pInit, initargs=(queue1, "pool1"))
pool2 = mp.Pool(1, initializer=pInit, initargs=(queue2, "pool2"))
# Assign entry function for each pool
pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
pool1.apply_async(f1, args=[(v1, a1)], callback=succCB, error_callback=failCB)
pool2.apply_async(f2, args=[(v2, a2)], callback=succCB, error_callback=failCB)
# Parent process, worker processes do not see this anymore
# Parent process notifies the worker processes
for x in range(20):
a1[x] = x
a2[x] = x+10
v1.value = 2
v2.value = 18
queue1.put(1)
queue1.put(1)
queue2.put(18)
# Parant processes terminate or quit the worker processes
queue1.put(-1) # Quit properly
queue1.put(20) # Raise exception
queue2.put(-1) # Quit properly
pool1.close()
pool2.close()
pool1.join()
pool2.join()
输出为
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
pool2 is initialized
f1(18, Value('i', 18), array('i', [10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]))
f2: Quit
pool1 is initialized
f1(1, Value('i', 2), array('i', [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]))
f1: Quit
Success returns = f1
Success returns = f2
Failure returns = f1: I do not like 20!