如何在多处理中与另一个脚本共享全局变量



问题:如何在script2中使用变量x?我有两个脚本,其中第一个包含2个多处理函数,第二个包含1个多处理功能。如何将共享变量用于所有3个多处理函数?

script1.py

from script2 import function3
x = None
def function1():
global x
while True:
x = input()  # updates global variable x
def function2():
global x
while True:
print(x)     # prints global variable x
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p3 = multiprocessing.Process(target=function3)
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
# some condition to stop all processes

script2.py

def function3():
while True:      
print(x*2)   # prints global variable x*2

以下是根据@martineau提供的注释创建共享托管字符串值的示例。

在Linux这样的平台上,默认情况下fork用于创建新进程,您可以对其进行编码:

import multiprocessing
from ctypes import c_char_p
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
def function1():
s.value = 'New value'  # updates global variable s
event.set() # show we have a new value
def function2():
event.wait() # wait for new s value
print(s.value)
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()

打印:

New value

在使用spawn创建新进程的Windows等平台上,共享字符串将作为参数传递给进程,以确保只创建字符串的一个实例。

import multiprocessing
from ctypes import c_char_p
def function1(s, event):
s.value = 'New value'
event.set() # show we have a new value
def function2(s, event):
event.wait() # wait for new s value
print(s.value)
# I need this for Windows:
if __name__ == '__main__':
s = multiprocessing.Manager().Value(c_char_p, '')
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(s, event))
p2 = multiprocessing.Process(target=function2, args=(s, event))
p1.start()
p2.start()
p1.join()
p2.join()

打印:

New value

需要上面的if __name__ == '__main__':检查,否则我们将进入递归循环,因为我们新创建的进程从顶部开始执行源,如果没有该检查,将无限次创建新进程。因此,sevent的定义不能在该检查之外,否则每个新创建的进程都将创建这些变量的自己的实例。但这意味着我们现在必须将这些变量作为参数传递,而在分叉示例中,它们只能被继承。

更新:在Linux/Unix上创建共享numpy阵列

import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
p1 = multiprocessing.Process(target=function1)
p2 = multiprocessing.Process(target=function2)
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)

打印:

arr = [[1 1 1]
[1 1 1]]
arr = [[1 1 1]
[1 1 1]]

在Windows上创建共享numpy阵列

import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def function1(arr, event):
shape = arr.shape
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2(arr, event):
event.wait() # wait for new arr value
print('arr =', arr)
if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=function1, args=(arr, event))
p2 = multiprocessing.Process(target=function2, args=(arr, event))
p1.start()
p2.start()
p1.join()
p2.join()
print('arr =', arr)

在Windows上使用带有多处理器池的共享numpy阵列

在使用多处理池时,无论是将数组作为参数传递给辅助函数,还是在本例中使用它为池中的每个进程初始化全局变量,都必须将共享数组传递给每个进程,并从中重新创建numpy数组。

import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
def init_pool(shared_array, the_shape, the_event):
global arr, shape, event
shape = the_shape
event = the_event
# recreate the numpy array from the shared array:
arr = to_numpy_array(shared_array, shape)
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
if __name__ == '__main__':
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
pool = multiprocessing.Pool(2, initializer=init_pool, initargs=(shared_array, shape, event))
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)

在Linux/Unix上使用带有多处理器池的共享numpy阵列

import multiprocessing
import ctypes
import numpy as np
def to_numpy_array(shared_array, shape):
'''Create a numpy array backed by a shared memory Array.'''
arr = np.ctypeslib.as_array(shared_array)
return arr.reshape(shape)
def to_shared_array(arr, ctype):
shared_array = multiprocessing.Array(ctype, arr.size, lock=False)
temp = np.frombuffer(shared_array, dtype=arr.dtype)
temp[:] = arr.flatten(order='C')
return shared_array
arr = np.array([[0, 0, 0], [0, 0, 0]], dtype=np.int32)
shape = arr.shape
shared_array = to_shared_array(arr, ctypes.c_int32)
# You have to now use the shared array as the base:
arr = to_numpy_array(shared_array, shape)
event = multiprocessing.Event()
def function1():
for x in range(shape[0]):
for y in range(shape[1]):
arr[x, y] = 1
event.set() # show we have a new value
def function2():
event.wait() # wait for new arr value
print('arr =', arr)
pool = multiprocessing.Pool(2)
pool.apply_async(function1)
pool.apply_async(function2)
# wait for tasks to complete
pool.close()
pool.join()
print('arr =', arr)

最新更新