我的要求类似于多个生产者,单个消费者除了我需要在python
我创建了一个产生5个并发进程的应用程序(我使用的是多进程库)。这5道工序独立输出dict格式。
之前我将输出输出到控制台,但现在我想将其输出到文件。
我正在寻找一个模式,其中所有我的5生产者写一个共享队列,支持并发写。
单个消费者进程也可以访问这个队列,并从队列中消费数据,如果没有数据可写,则可以等待,并在生产者完成任务时终止。
谢谢阿
既然您已经使用了多进程,那么您所需要的就是队列类
和一个示例(从Queue文档修改)
from multiprocessing import Process, Queue
def child(q, url):
result = my_process(url)
q.put(result)
if __name__ == '__main__':
q = Queue()
urls = [...]
children = []
for url in urls:
p = Process(target=child, args=(q,url))
p.start()
children.append(p)
for p in children:
p.join()
print q.get() #or write to file (might not be the answer from this child)
编辑:对于每个子节点都有多个答案,将最后一个For循环替换为:
while 0 != multiprocessing.active_children():
print q.get()
我已经在Python中实现了这种模式,其中一个监督进程生成一堆进程,然后从所有进程中消费日志消息,并将这些日志消息写入单个日志文件。
基本上,我使用execve来生成进程,并为每个进程指定连接到PTY的stderr。然后我的主管打开了所有的主pty,并使用select
在一个循环中读取它们。PTY是由tty行规则行缓冲的,您可以对它们使用readline进行非阻塞读取。我相信我也在游戏中使用了fcnl来设置os。O_NONBLOCK也是。
伟大的工作。唯一的问题是,当您从select轮询返回时,每个线程需要读取不止一行,否则可能会丢失输出(假设您有一些东西正在收集子进程并重新启动)。通过读取每个PTY上可用的所有行,还可以避免回溯与其他消息交织在一起。
如果您确实需要发送对象而不是文本行,那么您最好使用真正的发布-订阅消息传递系统,如AMQP或ZeroMQ。AMQP是一个比你需要的更大的锤子,所以只有当你希望构建许多类似的应用程序时才检查它。否则,请尝试更简单的0MQ http://www.zeromq.org/intro:read-the-manual,它只是一个消息传递库,使套接字更容易使用。