多重处理后,我的队列为空.流程实例完成



我有一个python脚本,在文件的顶部有:

result_queue = Queue.Queue()
key_list = *a large list of small items* #(actually from bucket.list() via boto)

我了解到队列是过程安全的数据结构。我有一个方法:

def enqueue_tasks(keys):
    for key in keys:
        try:
            result = perform_scan.delay(key)
            result_queue.put(result)
        except:
           print "failed"

这里的perform_scan.delay()函数实际上调用了一个芹菜工作者,但我认为这与此无关(它是一个异步进程调用)。

我还有:

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return izip_longest(fillvalue=fillvalue, *args)

最后,我有一个main()函数:

def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    concurrent.futures.wait(futures)
    print len(result_queue)

print语句的结果为0。然而,如果我在enqueue_tasks中包含一个result_queue大小的print语句,当程序运行时,我可以看到大小正在增加,并且正在向队列中添加内容。

对正在发生的事情有什么想法?

这个问题似乎有一个更简单的解决方案。

你正在建立一个未来清单。期货的全部意义在于,它们是未来的结果。特别是,无论每个函数返回什么,都是未来的(最终)价值。因此,根本不要做整个"将结果推送到队列"的事情,只需从任务函数返回结果,然后从未来中提取结果。


最简单的方法是打破这个循环,使每个键都是一个单独的任务,具有单独的未来。我不知道这是否适合你的真实代码,但如果是:

def do_task(key):
    try:
        return perform_scan.delay(key)
    except:
        print "failed"
def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(do_task, key) for key in key_list]
    # If you want to do anything with these results, you probably want
    # a loop around concurrent.futures.as_completed or similar here,
    # rather than waiting for them all to finish, ignoring the results,
    # and printing the number of them.
    concurrent.futures.wait(futures)
    print len(futures)

当然,这不能分组。但是你需要它吗?

分组是必要的最有可能的原因是任务太小,以至于调度它们(以及酸洗输入和输出)的开销会淹没实际工作。如果这是真的,那么几乎可以肯定的是,您可以等到整个批次完成后再返回任何结果。尤其是考虑到你甚至都不看结果,直到它们都完成了。(这种"分成小组,处理每组,合并在一起"的模型在数值工作等情况下很常见,其中每个元素可能很小,或者元素可能彼此不独立,但有些小组足够大,或者与其他工作独立。)

无论如何,这几乎同样简单:

def do_tasks(keys):
    results = []
    for key in keys:
        try:
            result = perform_scan.delay(key)
            results.append(result)
        except:
           print "failed"
    return results
def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    print sum(len(results) for results in concurrent.futures.as_completed(futures))

或者,如果你喜欢先等待,然后计算:

def main():
    executor = concurrent.futures.ProcessPoolExecutor(10)
    futures = [executor.submit(enqueue_tasks, group) for group in grouper(key_list, 40)]
    concurrent.futures.wait(futures)
    print sum(len(future.result()) for future in futures)

但我再次怀疑你是否需要这个。

您需要使用multiprocessing.Queue,而不是Queue.QueueQueue.Queue线程安全的,而不是进程安全的,因此您在一个进程中对其所做的更改不会反映在任何其他进程中。

相关内容

  • 没有找到相关文章

最新更新