Python - 结合多处理和异步



我正在尝试将多处理与异步相结合。该程序有两个主要组件 - 一个用于流式传输/生成内容,另一个用于消费内容。

我想做的是创建多个进程以利用多个 CPU 内核 - 一个用于流侦听器/生成器,另一个用于消费者,以及一个简单的进程,用于在消费者停止时关闭所有内容。

到目前为止,我的方法是创建流程并启动它们。每个这样的进程都会创建一个异步任务。所有进程启动后,我运行异步任务。到目前为止,我所拥有的(精简(是:

def consume_task(loop, consumer):
loop.create_task(consume_queue(consumer))
def stream_task(loop, listener, consumer):
loop.create_task(create_stream(listener, consumer))
def shutdown_task(loop, listener):
loop.create_task(shutdown(consumer))
async def shutdown(consumer):
print("Shutdown task created")
while not consumer.is_stopped():
print("No activity")
await asyncio.sleep(5)
print("Shutdown initiated")
loop.stop()
async def create_stream(listener, consumer):
stream = Stream(auth, listener)
print("Stream created")
stream.filter(track=KEYWORDS, is_async=True)
await asyncio.sleep(EVENT_DURATION)
print("Stream finished")
consumer.stop()
async def consume_queue(consumer):
await consumer.run()
loop = asyncio.get_event_loop()
p_stream = Process(target=stream_task, args=(loop, listener, consumer, ))
p_consumer = Process(target=consume_task, args=(loop, consumer, ))
p_shutdown = Process(target=shutdown_task, args=(loop, consumer, ))
p_stream.start()
p_consumer.start()
p_shutdown.start()
loop.run_forever()
loop.close()

问题是一切都挂起了(或者它阻塞了?( - 实际上没有任务正在运行。我的解决方案是将前三个函数更改为:

def consume_task(loop, consumer):
loop.create_task(consume_queue(consumer))
loop.run_forever()
def stream_task(loop, listener, consumer):
loop.create_task(create_stream(listener, consumer))
loop.run_forever()
def shutdown_task(loop, listener):
loop.create_task(shutdown(consumer))
loop.run_forever()

这确实运行。但是,consumerlistener对象无法通信。举个简单的例子,当create_stream函数调用consumer.stop()时,消费者不会停止。即使我更改了consumer类变量,也不会进行更改 - 例如,共享队列仍然为空。这就是我创建实例的方式:

queue = Queue()
consumer = PrintConsumer(queue)
listener = QueuedListener(queue, max_time=EVENT_DURATION)

请注意,如果我不使用进程,而只使用异步任务,则一切都按预期工作,因此我认为这不是参考问题:

loop = asyncio.get_event_loop()
stream_task(loop, listener, consumer)
consume_task(loop, consumer)
shutdown_task(loop, listener)
loop.run_forever()
loop.close()

是因为它们在不同的进程上运行吗?请问我应该如何解决这个问题?

发现问题!多处理创建实例的副本。解决方案是创建一个管理器,该管理器本身共享实例。

编辑[11/2/2020]:

import asyncio
from multiprocessing import Process, Manager
"""
These two functions will be created as separate processes.
"""
def task1(loop, shared_list):
output = loop.run_until_complete(asyncio.gather(async1(shared_list)))
def task2(loop, shared_list):
output = loop.run_until_complete(asyncio.gather(async2(shared_list)))
"""
These two functions will be called (in different processes) asynchronously.
"""
async def async1(shared_list):
pass
async def async2(shared_list):
pass
"""
Create the manager and start it up.
From this manager, also create a list that is shared by functions in different threads.
"""
manager = Manager()
manager.start()
shared_list = manager.list()
loop = asyncio.get_event_loop() # the event loop
"""
Create two processes.
"""
process1 = Process(target=task1, args=(loop, shared_list, ))
process2 = Process(target=task2, args=(loop, shared_list, ))
"""
Start the two processes and wait for them to finish.
"""
process1.start()
process2.start()
output1 = process1.join()
output2 = process2.join()
"""
Clean up
"""
loop.close()
manager.shutdown()

最新更新