loop.run_in_executor函数需要asyncio.lock()或threading.Lock()吗?<



我为我的项目复制了以下代码,它对我来说工作得很好,但我真的不明白下面的代码是如何运行我的blocking_function的:

@client.event
async def on_message(message):  
loop = asyncio.get_event_loop()
block_response = await loop.run_in_executor(ThreadPoolExecutor(), blocking_function)

每次收到消息时调用on_message。如果我接收多个消息,它们将被异步处理。

blocking_function是一个同步函数,我不想在另一个blocking_function运行时运行它。然后在blocking_function中,我应该使用threading.Lock()还是asyncio.lock()?

正如dirn在评论中指出的那样,在blocking_function中你不能使用asyncio.Lock,因为它不是异步的。(反过来也适用:你不能从异步函数中锁定threading.Lock,因为尝试这样做会阻塞事件循环。)如果您需要保护blocking_function的其他实例访问的数据,您应该使用threading.Lock

但是我真的不明白下面的代码是如何运行我的blocking_function

它将blocking_function交给您创建的线程池来运行它。线程池排队并运行函数(在后台进行)。从您的角度来看),并且run_in_executor安排在函数完成时通知事件循环,并将其返回值作为await表达式的结果传递。

注意,您应该使用None作为run_in_executor的第一个参数。如果您使用ThreadPoolExecutor(),您将为每条消息创建一个全新的线程池,并且永远不会丢弃它。线程池通常只创建一次,然后在后续工作中重用固定数量的线程("池")。None告诉asyncio使用它为此目的创建的线程池。

通过确保使用单个线程,您可以轻松实现所需的目标。

一个简单的解决方案是确保所有对blocking_function的调用都在单个线程上运行。这可以通过在async函数之外创建一个具有1个worker的ThreadPoolExecutor对象轻松实现。然后对阻塞函数的每个后续调用都将在该线程上运行

thread_pool = ThreadPoolExecutor(max_workers=1)
@client.event
async def on_message(message):  
loop = asyncio.get_event_loop()
block_response = await loop.run_in_executor(thread_pool, blocking_function)

不要忘记关闭线程。

最新更新