在 Python 池中创建队列延迟而不会阻塞



我有一个大型程序(特别是一个函数),我正在尝试使用JoinableQueue和多处理map_async方法并行化。我正在使用的函数对多维数组执行多个操作,因此我将每个数组分解为多个部分,每个部分独立计算;但是我需要尽早将其中一个数组拼接在一起,但是"拼接"发生在"评估"之前,我需要在 JoinableQueue 中引入某种延迟。我已经四处寻找可行的解决方案,但我对多处理非常陌生,其中大部分都超出了我的头脑。

这种措辞可能会令人困惑 - 道歉。这是我的代码大纲(我不能全部放进去,因为它很长,但如果需要,我可以提供额外的细节)

import numpy as np
import multiprocessing as mp
from multiprocessing import Pool, Pipe, JoinableQueue
def main_function(section_number):
#define section sizes
array_this_section = array[:,start:end+1,:]
histogram_this_section = np.zeros((3, dataset_size, dataset_size))
#start and end are defined according to the size of the array
#dataset_size is to show that the histogram is a different size than the array
for m in range(1,num_iterations+1):
#do several operations- each section of the array 
#corresponds to a section on the histogram
hist_queue.put(histogram_this_section)
#each process sends their own part of the histogram outside of the pool 
#to be combined with every other part- later operations 
#in this function must use the full histogram
hist_queue.join()
full_histogram = full_hist_queue.get()
full_hist_queue.task_done()
#do many more operations

hist_queue = JoinableQueue()
full_hist_queue = JoinableQueue()
if __name__ == '__main__':
pool = mp.Pool(num_sections)
args = np.arange(num_sections)
pool.map_async(main_function, args, chunksize=1)    
#I need the map_async because the program is designed to display an output at the 
#end of each iteration, and each output must be a compilation of all processes
#a few variable definitions go here
for m in range(1,num_iterations+1):
for i in range(num_sections):
temp_hist = hist_queue.get()    #the code hangs here because the queue 
#is attempting to get before anything 
#has been put
hist_full += temp_hist
for i in range(num_sections):
hist_queue.task_done()
for i in range(num_sections):
full_hist_queue.put(hist_full)    #the full histogram is sent back into 
#the pool

full_hist_queue.join()
#etc etc
pool.close()
pool.join()

我很确定您的问题是如何创建Queue并尝试与子进程共享它们。如果仅将它们作为全局变量,则可以在子进程中重新创建它们,而不是继承(确切的详细信息取决于您用于multiprocessing的操作系统和/或上下文)。

解决此问题的更好方法是避免使用multiprocessing.Pool生成流程,而是自己为工作线程显式创建Process实例。通过这种方式,您可以毫无困难地将Queue实例传递给需要它们的进程(从技术上讲,可以将队列传递给Pool工作线程,但这很尴尬)。

我会尝试这样的事情:

def worker_function(section_number, hist_queue, full_hist_queue): # take queues as arguments
# ... the rest of the function can work as before
# note, I renamed this from "main_function" since it's not running in the main process
if __name__ == '__main__':
hist_queue = JoinableQueue()   # create the queues only in the main process
full_hist_queue = JoinableQueue()  # the workers don't need to access them as globals
processes = [Process(target=worker_function, args=(i, hist_queue, full_hist_queue)
for i in range(num_sections)]
for p in processes:
p.start()
# ...

如果工作线程功能的不同阶段或多或少彼此独立(也就是说,"执行更多操作"步骤不直接依赖于其上方的"执行多个操作"步骤,而仅取决于full_histogram),则可以保留Pool,而是将不同的步骤拆分为单独的功能, 主进程可以通过多次调用来调用池上的map。在此方法中,无需使用自己的Queue,只需使用池中内置的即可。这可能是最好的,特别是如果您将工作拆分为的"部分"数量与计算机上的处理器内核数量不紧密对应。您可以让Pool与内核数匹配,并让每个内核依次处理数据的多个部分。

粗略的草图是这样的:

def worker_make_hist(section_number):
# do several operations, get a partial histogram
return histogram_this_section
def worker_do_more_ops(section_number, full_histogram):
# whatever...
return some_result
if __name__ == "__main__":
pool = multiprocessing.Pool() # by default the size will be equal to the number of cores
for temp_hist in pool.imap_unordered(worker_make_hist, range(number_of_sections)):
hist_full += temp_hist
some_results = pool.starmap(worker_do_more_ops, zip(range(number_of_sections),
itertools.repeat(hist_full)))

最新更新