多个生产者和单个消费者写入文件Python



我的要求类似于多个生产者,单个消费者除了我需要在python

我创建了一个产生5个并发进程的应用程序(我使用的是多进程库)。这5道工序独立输出dict格式。

之前我将输出输出到控制台,但现在我想将其输出到文件。

我正在寻找一个模式,其中所有我的5生产者写一个共享队列,支持并发写。

单个消费者进程也可以访问这个队列,并从队列中消费数据,如果没有数据可写,则可以等待,并在生产者完成任务时终止。

谢谢阿

既然您已经使用了多进程,那么您所需要的就是队列类

和一个示例(从Queue文档修改)

from multiprocessing import Process, Queue
def child(q, url):
    result = my_process(url)
    q.put(result)
if __name__ == '__main__':
    q = Queue()
    urls = [...]
    children = []
    for url in urls:
       p = Process(target=child, args=(q,url))
       p.start()
       children.append(p)
    for p in children:
       p.join()
       print q.get() #or write to file (might not be the answer from this child)
编辑:

对于每个子节点都有多个答案,将最后一个For循环替换为:

while 0 != multiprocessing.active_children():
    print q.get()

我已经在Python中实现了这种模式,其中一个监督进程生成一堆进程,然后从所有进程中消费日志消息,并将这些日志消息写入单个日志文件。

基本上,我使用execve来生成进程,并为每个进程指定连接到PTY的stderr。然后我的主管打开了所有的主pty,并使用select在一个循环中读取它们。PTY是由tty行规则行缓冲的,您可以对它们使用readline进行非阻塞读取。我相信我也在游戏中使用了fcnl来设置os。O_NONBLOCK也是。

伟大的工作。唯一的问题是,当您从select轮询返回时,每个线程需要读取不止一行,否则可能会丢失输出(假设您有一些东西正在收集子进程并重新启动)。通过读取每个PTY上可用的所有行,还可以避免回溯与其他消息交织在一起。

如果您确实需要发送对象而不是文本行,那么您最好使用真正的发布-订阅消息传递系统,如AMQP或ZeroMQ。AMQP是一个比你需要的更大的锤子,所以只有当你希望构建许多类似的应用程序时才检查它。否则,请尝试更简单的0MQ http://www.zeromq.org/intro:read-the-manual,它只是一个消息传递库,使套接字更容易使用。

最新更新