ThreadPoolExecutor与单独的asyncio循环通信



我有一个IO绑定的任务在循环中运行。这个任务要做很多工作,并且经常占用循环(这个词合适吗?)我的计划是在一个单独的进程或线程中运行它,使用run_in_executorProcessPoolExecutorThreadPoolExecutor分别运行它,并允许主循环完成它的工作。目前,对于任务之间的通信,我使用asyncio.PriorityQueue()asyncio.Event()进行通信,并希望重用这些,或者具有相同接口的东西,如果可能的话。

当前代码:

# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()
# Creates task based off the process object
future = asyncio.create_task(process_obj.main())

当前进程代码:

async def main():
while True:
#does things that hogs loop

我想做什么:

# Getter for events and queues so communication can happen
send, receive, send_event, receive_event = await process_obj.get_queues()
# I assume I could use Thread or Process executors
pool = concurrent.futures.ThreadPoolExecutor()
result = await loop.run_in_executor(pool, process_obj.run())

新进程代码:

def run():
asyncio.create_task(main())
async def main():
while True:
#does things that hogs loop

我如何像原来那样在这个新线程和原来的循环之间进行通信?

我无法复制您的代码。因此,请考虑这段代码从YouTube下载为例,我希望这将有助于您了解如何从线程函数获得结果:

示例代码:

def on_download(self, is_mp3: bool, is_mp4: bool, url: str) -> None:
if is_mp3 == False and is_mp4 == False:
self.ids.info_lbl.text = 'Please select a type of file to download.'
else:
self.ids.info_lbl.text = 'Downloading...'

self.is_mp3 = is_mp3
self.is_mp4 = is_mp4
self.url = url

Clock.schedule_once(self.schedule_download, 2)
Clock.schedule_interval(self.start_progress_bar, 0.1)

def schedule_download(self, dt: float) -> None:
'''
Callback method for the download.
'''

pool = ThreadPool(processes=1)
_downloader = Downloader(self.d_path)
self.async_result = pool.apply_async(_downloader.download,
(self.is_mp3, self.is_mp4, self.url))
Clock.schedule_interval(self.check_process, 0.1)

def check_process(self, dt: float) -> None:
'''
Check if download is complete.
'''
if self.async_result.ready():
resp = self.async_result.get()
if resp[0] == 'Error. Download failed.':
self.ids.info_lbl.text = resp[0]
# progress bar gray if error
self.stop_progress_bar(value=0)
else:
# progress bar blue if success
self.stop_progress_bar(value=100)
self.ids.file_name.text = resp[0]
self.ids.info_lbl.text = 'Finished downloading.'
self.ids.url_input.text = ''

Clock.unschedule(self.check_process)

我个人更喜欢from multiprocessing.pool import ThreadPool,现在它看起来像你的代码'猪',因为你正在等待结果。所以很明显,在有结果之前,程序将等待(这可能很长)。如果你看一下我的示例代码:

on_download将调度一个事件schedule download,这个将调度另一个事件check process。我不知道你的应用程序是GUI应用程序还是终端,因为你的问题中几乎没有代码,但你必须做什么,在你的循环中,你必须安排一个check process事件。如果你看我的check process:if self.async_result.ready():,只会在我的结果准备好时返回。现在你正在等待结果,这里所有的事情都在后台发生,主循环会时不时地检查结果(它不会霸占,如果没有结果,主循环会继续做它必须做的事情,而不是等待它)。

所以基本上你必须在循环中安排一些事件(尤其是结果事件),而不是一行一行地等待一个。这有意义吗?我的示例代码有帮助吗?对不起,我真的不擅长解释我脑子里的想法;)

-> mainloop
-> new Thread if there is any
-> check for result if there is any Threads
-> if there is a result
-> do something
-> mainloop keeps running
-> back to top

当您在main协程中执行while True时,它不会占用循环,而是阻塞循环,不接受其余任务来完成它们的工作。在基于事件的应用程序中运行进程并不是最好的解决方案,因为进程在数据共享方面不是很友好。

在不使用并行性的情况下,可以同时执行所有操作。您所需要的只是在while True的末尾执行await asyncio.sleep(0)。它返回循环,并允许执行其余任务。所以我们不退出协程。

在下面的示例中,我有一个listener,它使用while True并处理emitter添加到队列中的数据。

import asyncio
from queue import Empty
from queue import Queue
from random import choice
queue = Queue()

async def listener():
while True:
try:
# data polling from the queue
data = queue.get_nowait()
print(data)  # {"type": "event", "data": {...}}
except (Empty, Exception):
pass
finally:
# the magic action
await asyncio.sleep(0)

async def emitter():
# add a data to the queue
queue.put({"type": "event", "data": {...}})

async def main():
# first create a task for listener
running_loop = asyncio.get_running_loop()
running_loop.create_task(listener())
for _ in range(5):
# create tasks for emitter with random intervals to
# demonstrate that the listener is still running in
# the loop and handling the data put into the queue
running_loop.create_task(emitter())
await asyncio.sleep(choice(range(2)))

if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

最新更新