我写了一个通用的多处理包装器函数,它
- 获取函数及其输入
- 沿其第一维拆分这些输入
- 将拆分的输入馈送到多个进程
- 在队列中收集结果,最后将它们重新组合在一起(按正确的顺序(
最后一点是我遇到问题的地方。我使用multiprocessing.Queue
结果放入此队列的顺序与我调用进程的顺序无关,而是最快的进程首先写入。 除了将当前进程的索引添加到队列并在之后对结果进行排序之外,还有其他方法吗?
我看了multiprocessing.Pool
但这种方法有很大的开销,如果函数更复杂("不能腌制 xxx 对象"(,则不起作用。multiprocessing.Array
可能是一个选项,但我不知道如何初始化它,这样我就可以在每个插槽中只使用任意元素(numpy 数组的元组(并将其基本上用作列表。我读到数组的元素必须c_types......
代码:
import multiprocessing as mp
import numpy as np
import time
def change_tuple_order(t):
return tuple(map(lambda *tt: tuple(tt), *t))
def mp_wrapper(*args, fun, n_processes):
if n_processes == 1:
return fun(*args)
n_samples = np.shape(args[0])[0]
samples_per_process = n_samples // n_processes
def __wrapper_fun(i_process, queue):
queue.put(fun(*map(lambda a: a[samples_per_process*i_process:samples_per_process * (i_process+1)],
args)))
# Solution idea
# queue.put((i_process,
# fun(*map(lambda a: a[samples_per_process*i_process:samples_per_process * (i_process+1)],
# args))))
# Start the processes and save their results in a queue
result_queue = mp.Queue(n_processes)
process_list = []
for i in range(n_processes):
p = mp.Process(target=__wrapper_fun, args=[i, result_queue], name=str(i))
p.start()
process_list.append(p)
# Make sure the queue is full before joining the processes with a small timeout, otherwise there occurred
# errors, because a process which filled the queue but was still alive
while True:
time.sleep(0.001)
if result_queue.full():
break
# Wait for the processes to finish
for i in range(n_processes):
process_list[i].join(timeout=0.001)
# Combine and return the results
results = [result_queue.get() for _ in range(n_processes)]
# Solution idea
# results = change_tuple_order(results)
# idx = np.argsort(results[0])
# results = [results[1][i] for i in idx]
if isinstance(results[0], tuple):
results = change_tuple_order(results)
return tuple([np.concatenate(r, axis=0) for r in results])
else:
return np.concatenate(results, axis=0)
def f_test(x):
time.sleep(np.random.random()*10)
return x*x
def f_test2(x, y):
return x**2, y**3, x+y
n_processes = 5
print(mp_wrapper(np.repeat(np.arange(n_processes), 10),
fun=f_test, n_processes=n_processes))
# [ 4 4 4 9 9 9 0 0 0 1 1 1 16 16 16]
print(mp_wrapper(np.repeat(np.arange(n_processes), 3), np.repeat(np.arange(n_processes), 3),
fun=f_test2, n_processes=n_processes))
# (array([0, 0, 0, 1, 1, 1, 4, 4, 4, 9, 9, 9, 16, 16, 16]),
# array([0, 0, 0, 1, 1, 1, 8, 8, 8, 27, 27, 27, 64, 64, 64]),
# array([0, 0, 0, 2, 2, 2, 4, 4, 4, 6, 6, 6, 8, 8, 8]))
大多数情况下,结果的顺序很好,但显然不确定哪个过程最快。
有没有办法使用Queue
或Array
作为列表,我可以在其中以特定索引编写任意元素,还是我必须自己跟踪输入和输出的顺序?
如类似主题中所述,您可以使用 SeqQueue 来保持索引项的顺序。