Python:如何在多处理情况下返回值



假设我有一个Process-es的集合,a[0]a[m]

然后,这些进程将通过队列将作业发送到另一个Process -es 集合,b[0]通过 b[n] ,其中m > n

或者,要示意图:

a[0], a[1], ..., a[m] ---Queue---> b[0], b[1], ..., b[n]

现在,如何将b过程的结果返回到相关的a过程?

我的第一个猜测是使用multiprocessing.Pipe()

因此,我尝试执行以下操作:

## On the 'a' side
pipe = multiprocessing.Pipe()
job['pipe'] = pipe
queue.put(job)
rslt = pipe[0].recv()
## On the 'b' side
job = queue.get()
... process the job ...
pipe = job['pipe']
pipe.send(result)

并且它不适用于错误:Required argument 'handle' (pos 1) not found

阅读了许多文档,我想出了:

## On the 'a' side
pipe = multiprocessing.Pipe()
job['pipe'] = multiprocessing.reduction.reduce_connection(pipe[1])
queue.put(job)
rslt = pipe[0].recv()
## On the 'b' side
job = queue.get()
... process the job ...
pipe = multiprocessing.reduction.rebuild_connection(job['pipe'], True, True)
pipe.send(result)

现在我得到一个不同的错误:ValueError: need more than 2 values to unpack .

我尝试了搜索和搜索,但仍然找不到如何正确使用reduce_rebuild_方法。

请帮忙,以便我可以将值从b返回到a

我建议避免使用管道和文件描述符的这种移动(上次我尝试时,它不是很标准,也没有很好的文档记录)。不得不处理它很痛苦,我不推荐它:-/

我建议一种不同的方法:让main管理连接。保留工作队列,但以不同的路径发送响应。这意味着您需要线程的某种标识符。我将提供一个玩具实现来说明我的建议:

#!/usr/bin/env python
import multiprocessing
import random
def fib(n):
    "Slow fibonacci implementation because why not"
    if n < 2:
        return n
    return fib(n-2) + fib(n-1)

def process_b(queue_in, queue_out):
    print "Starting process B"
    while True:
        j = queue_in.get()
        print "Job: %d" % j["val"]
        j["result"] = fib(j["val"])
        queue_out.put(j)

def process_a(index, pipe_end, queue):
    print "Starting process A"
    value = random.randint(5, 50)
    j = {
        "a_id": index,
        "val": value,
    }
    queue.put(j)
    r = pipe_end.recv()
    print "Process A sent value %d and received: %s" % (value, r)

def main():
    print "Starting main"
    a_pipes = list()
    jobs = multiprocessing.Queue()
    done_jobs = multiprocessing.Queue()
    for i in range(5):
        multiprocessing.Process(target=process_b, args=(jobs, done_jobs,)).start()
    for i in range(10):
        receiver, sender = multiprocessing.Pipe(duplex=False)
        a_pipes.append(sender)
        multiprocessing.Process(target=process_a, args=(i, receiver, jobs)).start()
    while True:
        j = done_jobs.get()
        a_pipes[j["a_id"]].send(j["result"])
if __name__ == "__main__":
    main()

请注意,作业队列直接连接在ab进程之间。 a进程负责放置它们的标识符("主人"应该知道)。b使用不同的队列来完成工作。我使用相同的作业字典,但典型的实现应该使用一些更量身定制的数据结构。此响应应具有 a 标识符,以便主服务器将其发送到特定进程。

假设有某种方法可以将其与您的方法一起使用,我一点也不讨厌(这将是我的第一个方法)。但是必须处理文件描述符以及reduce_rebuild_方法并不好。一点也不。

因此,正如@MariusSiuram在这篇文章中所解释的那样,试图传递一个Connection对象是一种沮丧的练习。

我终于求助于使用 DictProxy 将值从 B 返回到 A

这是概念:

### This is in the main process
...
jobs_queue = multiprocessing.Queue()
manager = multiprocessing.Manager()
ret_dict = manager.dict()
...
# Somewhere during Process initialization, jobs_queue and ret_dict got passed to
# the workers' constructor
...
### This is in the "A" (left-side) workers
...
self.ret_dict.pop(self.pid, None)  # Remove our identifier if exist
self.jobs_queue.put({
    'request': parameters_to_be_used_by_B,
    'requester': self.pid
})
while self.pid not in self.ret_dict:
    time.sleep(0.1)  # Or any sane value
result = self.ret_dict[self.pid]    
...
### This is in the "B" (right-side) workers
...
while True:
    job = self.jobs_queue.get()
    if job is None:
        break
    result = self.do_something(job['request'])
    self.ret_dict[job['requester']] = result
...

相关内容

最新更新