我的代码如下所示。在proc.join()
循环中似乎"挂起"。如果我创建包含 10 条记录的数据帧df
,整个过程会快速完成,但从 10000 条记录开始(如图所示(,则程序似乎只是挂起。我正在使用htop
来查看 CPU 核心使用情况,我确实看到它们都飙升到 100%,但在它们回落到 0% 后很长时间,程序似乎没有继续。对我做错了什么有什么想法吗?
import pandas as pd
import numpy as np
import multiprocessing
from multiprocessing import Process, Queue
def do_something(df, partition, q):
for index in partition:
q.put([v for v in df.iloc[index]])
def start_parallel_processing(df, partitions):
q = Queue()
procs = []
results = []
for partition in partitions:
proc = Process(target=do_something, args=(df, partition, q))
proc.start()
procs.extend([proc])
for i in range(len(partitions)):
results.append(q.get(True))
for proc in procs:
proc.join()
return results
num_cpus = multiprocessing.cpu_count()
df = pd.DataFrame([(x, x+1) for x in range(10000)], columns=['x','y'])
partitions = np.array_split(df.index, num_cpus)
results = start_parallel_processing(df, partitions)
len(results)
看起来Queue.Queue
的行为不符合您的要求,也不是为在多个进程之间共享而设计的,而是您必须使用Manager.Queue()
我添加了一些打印来理解您的代码流,
您仍然可以完善代码以使用Pool()
而不是num_cpus
import pandas as pd
import numpy as np
import multiprocessing
import pprint
from multiprocessing import Process, Queue, Manager
def do_something(df, partition, q):
# print "do_something " + str(len(partition)) + " times"
for index in partition:
# print index
for v in df.iloc[index]:
#print "sending v to queue: " + str(len(df.iloc[index]))
q.put(v, False)
print "task_done(), qsize is "+ str(q.qsize())
def start_parallel_processing(df, partitions):
m = Manager()
q = m.Queue()
procs = []
results = []
print "START: launching "+ str(len(partitions)) + " process(es)"
index = 0
for partition in partitions:
print "launching "+ str(len(partitions)) + " process"
proc = Process(target=do_something, args=(df, partition, q))
procs.extend([proc])
proc.start()
index += 1
print "launched "+ str(index) + "/" + str(len(partitions)) + " process(es)"
while True:
try:
results.append(q.get( block=False ))
except:
print "QUEUE END"
break
print pprint.pformat(results)
process_count = 0
for proc in procs:
process_count += 1
print "joining "+ str(process_count) + "/" + str(len(procs)) + " process(es)"
proc.join()
return results
num_cpus = multiprocessing.cpu_count()
df = pd.DataFrame([(x, x+1) for x in range(10000)], columns=['x','y'])
partitions = np.array_split(df.index, num_cpus)
results = start_parallel_processing(df, partitions)
print "len(results) is: "+ str(len(results))