Python使用队列进行多处理(动态拆分加载)



我正在尝试使用多处理来处理大量文件。我尝试将文件列表放入队列中,并使 3 个工作人员使用通用的 Queue 数据类型拆分负载。然而,这似乎不起作用。可能我对多处理包中的队列有误解。下面是示例源代码:

import multiprocessing
from multiprocessing import Queue
def worker(i, qu):
    """worker function"""
    while ~qu.empty():
        val=qu.get()
        print 'Worker:',i, ' start with file:',val
        j=1
        for k in range(i*10000,(i+1)*10000): # some time consuming process
            for j in range(i*10000,(i+1)*10000):
                j=j+k
        print 'Worker:',i, ' end with file:',val

if __name__ == '__main__':
    jobs = []
    qu=Queue()
    for j in range(100,110): # files numbers are from 100 to 110
        qu.put(j)
    for i in range(3): # 3 multiprocess
        p = multiprocessing.Process(target=worker, args=(i,qu))
        jobs.append(p)
        p.start()
    p.join()

感谢您的评论。我知道使用游泳池是最好的解决方案。

import multiprocessing
import time
def worker(val):
    """worker function"""
    print 'Worker: start with file:',val
    time.sleep(1.1)
    print 'Worker: end with file:',val

if __name__ == '__main__':
    file_list=range(100,110)
    p = multiprocessing.Pool(2)
    p.map(worker, file_list)

两个问题:

1(您仅在第 3 个流程中加入

2(为什么不使用多处理。池?

3( qu.get(( 上的竞争条件

1 & 3(

import multiprocessing
from multiprocessing import Queue
def worker(i, qu):
    """worker function"""
    while 1:
        try:
            val=qu.get(timeout)
        except  Queue.Empty: break# Yay no race condition
        print 'Worker:',i, ' start with file:',val
        j=1
        for k in range(i*10000,(i+1)*10000): # some time consuming process
            for j in range(i*10000,(i+1)*10000):
                j=j+k
        print 'Worker:',i, ' end with file:',val

if __name__ == '__main__':
    jobs = []
    qu=Queue()
    for j in range(100,110): # files numbers are from 100 to 110
        qu.put(j)
    for i in range(3): # 3 multiprocess
        p = multiprocessing.Process(target=worker, args=(i,qu))
        jobs.append(p)
        p.start()
    for p in jobs: #<--- join on all processes ...
        p.join()

2(

有关如何使用池,请参阅:

https://docs.python.org/2/library/multiprocessing.html

您只加入最后一个创建的流程。这意味着,如果第一个或第二个进程仍在工作,而第三个进程已完成,则您的主进程正在关闭并在剩余进程完成之前杀死它们。

您应该将它们全部加入,以便等到它们完成:

    for p in jobs:
        p.join()

另一件事是您应该考虑使用qu.get_nowait()以摆脱qu.empty()qu.get()之间的竞争条件。

例如:

    try:
        while 1:
            message = self.queue.get_nowait()
            """ do something fancy here """ 
    except Queue.Empty:
        pass

我希望这有所帮助

最新更新