对使用Multiprocessing.Queue生成的结果进行排序



我写了一个通用的多处理包装器函数,它

  • 获取函数及其输入
  • 沿其第一维拆分这些输入
  • 将拆分的输入馈送到多个进程
  • 在队列中收集结果,最后将它们重新组合在一起(按正确的顺序(

最后一点是我遇到问题的地方。我使用multiprocessing.Queue结果放入此队列的顺序与我调用进程的顺序无关,而是最快的进程首先写入。 除了将当前进程的索引添加到队列并在之后对结果进行排序之外,还有其他方法吗?

我看了multiprocessing.Pool但这种方法有很大的开销,如果函数更复杂("不能腌制 xxx 对象"(,则不起作用。multiprocessing.Array可能是一个选项,但我不知道如何初始化它,这样我就可以在每个插槽中只使用任意元素(numpy 数组的元组(并将其基本上用作列表。我读到数组的元素必须c_types......

代码:

import multiprocessing as mp
import numpy as np
import time

def change_tuple_order(t):
return tuple(map(lambda *tt: tuple(tt), *t))

def mp_wrapper(*args, fun, n_processes):

if n_processes == 1:
return fun(*args)

n_samples = np.shape(args[0])[0]
samples_per_process = n_samples // n_processes
def __wrapper_fun(i_process, queue):
queue.put(fun(*map(lambda a: a[samples_per_process*i_process:samples_per_process * (i_process+1)], 
args)))
# Solution idea
# queue.put((i_process,
#            fun(*map(lambda a: a[samples_per_process*i_process:samples_per_process * (i_process+1)],
#                args))))

# Start the processes and save their results in a queue
result_queue = mp.Queue(n_processes)
process_list = []
for i in range(n_processes):
p = mp.Process(target=__wrapper_fun, args=[i, result_queue], name=str(i))
p.start()
process_list.append(p)
# Make sure the queue is full before joining the processes with a small timeout, otherwise there occurred
# errors, because a process which filled the queue but was still alive
while True:
time.sleep(0.001)
if result_queue.full():
break
# Wait for the processes to finish
for i in range(n_processes):
process_list[i].join(timeout=0.001)
# Combine and return the results
results = [result_queue.get() for _ in range(n_processes)]
# Solution idea
# results = change_tuple_order(results)
# idx = np.argsort(results[0])
# results = [results[1][i] for i in idx]
if isinstance(results[0], tuple):
results = change_tuple_order(results)
return tuple([np.concatenate(r, axis=0) for r in results])
else:
return np.concatenate(results, axis=0)

def f_test(x):
time.sleep(np.random.random()*10)
return x*x

def f_test2(x, y):
return x**2, y**3, x+y

n_processes = 5
print(mp_wrapper(np.repeat(np.arange(n_processes), 10),
fun=f_test, n_processes=n_processes))
# [ 4  4  4  9  9  9  0  0  0  1  1  1 16 16 16]
print(mp_wrapper(np.repeat(np.arange(n_processes), 3), np.repeat(np.arange(n_processes), 3),
fun=f_test2, n_processes=n_processes))
# (array([0, 0, 0, 1, 1, 1, 4, 4, 4, 9,  9,  9, 16, 16, 16]), 
#  array([0, 0, 0, 1, 1, 1, 8, 8, 8, 27, 27, 27, 64, 64, 64]), 
#  array([0, 0, 0, 2, 2, 2, 4, 4, 4, 6, 6, 6, 8, 8, 8]))

大多数情况下,结果的顺序很好,但显然不确定哪个过程最快。

有没有办法使用QueueArray作为列表,我可以在其中以特定索引编写任意元素,还是我必须自己跟踪输入和输出的顺序?

如类似主题中所述,您可以使用 SeqQueue 来保持索引项的顺序。

相关内容

  • 没有找到相关文章

最新更新