我为我的项目复制了以下代码,它对我来说工作得很好,但我真的不明白下面的代码是如何运行我的blocking_function的:
@client.event
async def on_message(message):
loop = asyncio.get_event_loop()
block_response = await loop.run_in_executor(ThreadPoolExecutor(), blocking_function)
每次收到消息时调用on_message。如果我接收多个消息,它们将被异步处理。
blocking_function是一个同步函数,我不想在另一个blocking_function运行时运行它。然后在blocking_function中,我应该使用threading.Lock()还是asyncio.lock()?
正如dirn在评论中指出的那样,在blocking_function
中你不能使用asyncio.Lock
,因为它不是异步的。(反过来也适用:你不能从异步函数中锁定threading.Lock
,因为尝试这样做会阻塞事件循环。)如果您需要保护blocking_function
的其他实例访问的数据,您应该使用threading.Lock
。
但是我真的不明白下面的代码是如何运行我的
blocking_function
它将blocking_function
交给您创建的线程池来运行它。线程池排队并运行函数(在后台进行)。从您的角度来看),并且run_in_executor
安排在函数完成时通知事件循环,并将其返回值作为await
表达式的结果传递。
注意,您应该使用None
作为run_in_executor
的第一个参数。如果您使用ThreadPoolExecutor()
,您将为每条消息创建一个全新的线程池,并且永远不会丢弃它。线程池通常只创建一次,然后在后续工作中重用固定数量的线程("池")。None
告诉asyncio使用它为此目的创建的线程池。
通过确保使用单个线程,您可以轻松实现所需的目标。
一个简单的解决方案是确保所有对blocking_function
的调用都在单个线程上运行。这可以通过在async函数之外创建一个具有1个worker的ThreadPoolExecutor
对象轻松实现。然后对阻塞函数的每个后续调用都将在该线程上运行
thread_pool = ThreadPoolExecutor(max_workers=1)
@client.event
async def on_message(message):
loop = asyncio.get_event_loop()
block_response = await loop.run_in_executor(thread_pool, blocking_function)
不要忘记关闭线程。