我试图将一项更耗时的任务并行化,但无法使其正常工作,也无法真正弄清楚原因。
我的代码:
from scipy.spatial import distance
import numpy as np
from scipy import linalg
import os
from multiprocessing import Queue
from multiprocessing import Process
from typing import Tuple
num_cpu = os.cpu_count()
arr_1 = np.random.random((20000, 1000))
arr_2 = np.random.random((20000, 1000))
def cosine_similarity( a: np.ndarray, b: np.ndarray):
assert len(a) == len(b)
len_a = linalg.norm(a)
len_b = linalg.norm(b)
# Check if one vector is all zeros. Possible, not probable
if len_a == 0 or len_b == 0:
return 0
return a.dot(b) / (len_a * len_b)
def cosine_distance(a: np.ndarray, b: np.ndarray):
assert len(a) == len(b)
return 1 - cosine_similarity(a, b)
def run_task(start: int, end: int, queue_out: Queue):
print("Called with " + str(start) + " to " + str(end))
if end > len(arr_1):
end = len(arr_1)
for i in range(start, end):
min_dist = 2.0
labeled_data = arr_1[i]
for unlabeled_data in arr_2:
min_dist = min(min_dist, cosine_distance(labeled_data, unlabeled_data))
queue_out.put((i, min_dist))
step = len(arr_1) // num_cpu
t_s = []
queue = Queue()
for i in range(0, len(arr_1), step):
print("Calling with " + str(i) + " to " + str(i + step))
p = Process(target=run_task, args=(i, i + step, queue))
p.start()
t_s.append(p)
for p in t_s:
p.join()
当我调用queue.qsize()
时,它为0。代码也基本上立即完成,并且我只得到";正在与呼叫"输出,但不是"输出";用";
如果我手动调用run_task(0, 1000, queue)
,它会运行几分钟(大约3分钟(,而queue.qsize()
是1000。
当我看到t_s
时,它给出了20行<Process name='Process-21' pid=17948 parent=4044 stopped exitcode=1>,
(具有不同的进程名称和pid(。
我在这里做错了什么?
编辑:
尝试使用Pool.map:
def run_task(index):
print("Called index " + str(index))
min_dist = 2.0
labeled_data = arr_1[index]
for unlabeled_data in arr_2:
min_dist = min(min_dist, cosine_distance(labeled_data, unlabeled_data))
(index, min_dist)
a = 0
with Pool(processes=num_cpu) as pool:
a = pool.imap(run_task, range(len(arr_1)))
for i in a:
print(f"showing the result as it is ready {i}")
同样的行为,似乎没有什么叫做
好吧,不管出于什么原因,它都与我的导入有关。在我将它从Process
更改为multiprocessing.Process
之后,它起了作用,所以我想发生了一些命名空间屏蔽。