带队列的多处理



这是我下面的代码,我把字符串放在队列中,希望dowork2做一些工作,并在shared_queue中返回char

但我总是一无所获while not shared_queue.empty()

请给我一些点,谢谢。

import time
import multiprocessing as mp

class Test(mp.Process):
    def __init__(self, **kwargs):
        mp.Process.__init__(self)
        self.daemon = False
        print('dosomething')
    def run(self):
        manager = mp.Manager()
        queue = manager.Queue()
        shared_queue = manager.Queue()
        # shared_list = manager.list()
        pool = mp.Pool()
        results = []
        results.append(pool.apply_async(self.dowork2,(queue,shared_queue)))
        while True:
            time.sleep(0.2)
            t =time.time()
            queue.put('abc')
            queue.put('def')
            l = ''
            while not shared_queue.empty():
                l = l + shared_queue.get()
            print(l)
            print( '%.4f' %(time.time()-t))
        pool.close()
        pool.join()
    def dowork2(queue,shared_queue):
        while True:
            path = queue.get()
            shared_queue.put(path[-1:])
if __name__ == '__main__':
    t = Test()
    t.start()
    # t.join()
    # t.run()

我设法通过将你的 dowork2 移到类外来让它工作。如果在测试类之前将 dowork2 声明为函数并将其调用为

results.append(pool.apply_async(dowork2, (queue, shared_queue)))

它按预期工作。 我不是 100% 确定,但它可能会出错,因为您的测试类已经在子类化进程。现在,当您的池创建一个子进程并在子进程中初始化相同的类时,某些内容会在某处被覆盖。

总的来说,我想知道游泳池是否真的是你想在这里使用的。您的工作线程似乎处于无限循环中,指示您不需要来自工作线程的返回值,而只需要返回队列中的结果。如果是这种情况,您可以删除池。

当我废弃池并替换为另一个子流程时,我还设法让它在类中保持您的工作线程功能:

foo = mp.Process(group=None, target=self.dowork2, args=(queue, shared_queue))
foo.start()
# results.append(pool.apply_async(Test.dowork2, (queue, shared_queue)))
while True:
    ....

(不过,您需要将 self 添加到您的 worker 中,或将其声明为静态方法:(

def dowork2(self, queue, shared_queue):

相关内容

  • 没有找到相关文章

最新更新