如何在现有的协例程中增加asyncio线程限制



我目前正在开发一个使用asyncio并发的程序。为了显示问题,这里的代码进行了过度简化。如果你需要的话,它可以不依赖于任何东西而运行。

你有两个任务级别:

  • 1:一个在operation_loop中创建asyncio worker。
  • 2:一个在worker中调用具有阻塞点的同步函数,并请求db(这里用time.sleep模拟)。请记住,custom_sleep函数实际上是一个包含阻塞部分的复杂工作:这就是为什么它在线程中。它是由一个队列供养的。
    • 队列每秒被删除。

但是似乎asyncio。To_thread最多可以处理20个线程,所以,一个worker和20个线程就可以了,但是当使用21个线程时,收集操作时间增加了一倍。然后是61…如果我们添加一个worker,我们仍然限制为20个线程。

import asyncio
import logging
import os
import threading
import time
def perf_counter_ms() -> int:
return time.perf_counter_ns() // 1000000
def custom_sleep(worker_id, machine_name):
logging.debug("Start sleep for worker_id : {worker_id} for machine : {machine_name}")
time.sleep(1)
thread_name = threading.current_thread().name
logging.debug(f"Thread {thread_name} is running on worker_id : {worker_id} for machine : {machine_name}")
# return worker_id, 1, 1
async def worker(
worker_id,
queue: asyncio.Queue,
):
logging.info("Starting worker")
while True:
logging.debug(f"There are {queue.qsize()} items in the queue")
machines, now, nominal_intensities = await queue.get()
tasks = []
for machine in machines:
# FIXME: Running only a limited number of tasks in parallel.
tasks.append(
asyncio.to_thread(
custom_sleep,
worker_id,
machine
)
)
bench_ms = perf_counter_ms()
logging.debug(f"PERF: Getting all operations for worker_id : {worker_id}")
try:
results = await asyncio.wait_for(asyncio.gather(*tasks), 5)
logging.debug(f"PERF: Getting all operations succeed in time: {perf_counter_ms() - bench_ms} with worker_id : {worker_id}")
except asyncio.TimeoutError as e:
logging.error(f"TimeoutError on operation for worker_id : {worker_id}")
queue.task_done()

async def operation_loop(
):
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
for id in range(1):
asyncio.create_task(
worker(
id,
queue,
)
)
while True:
await asyncio.sleep(1)
try:
queue.put_nowait(([str(i) for i in range(0,40)], 1, 2))
except asyncio.QueueFull:
logging.warning(
"Queue full, performance issue... operations might be lost"
)

def main():
# Execute when the module is not initialized from an import statement.
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(levelname)s [%(asctime)s] [%(name)s.%(funcName)s:%(lineno)d] %(message)s",
)
asyncio.run(
operation_loop(
)
)

if __name__ == "__main__":
main()
DEBUG [2023-03-28 21:38:10,205] [root.custom_sleep:14] Thread asyncio_0 is running on worker_id : 0 for machine : 0
DEBUG [2023-03-28 21:38:10,206] [root.custom_sleep:14] Thread asyncio_1 is running on worker_id : 0 for machine : 1
DEBUG [2023-03-28 21:38:10,206] [root.custom_sleep:14] Thread asyncio_2 is running on worker_id : 0 for machine : 2
DEBUG [2023-03-28 21:38:10,207] [root.custom_sleep:14] Thread asyncio_3 is running on worker_id : 0 for machine : 3
DEBUG [2023-03-28 21:38:10,207] [root.custom_sleep:14] Thread asyncio_4 is running on worker_id : 0 for machine : 4
DEBUG [2023-03-28 21:38:10,208] [root.custom_sleep:14] Thread asyncio_5 is running on worker_id : 0 for machine : 5
DEBUG [2023-03-28 21:38:10,208] [root.custom_sleep:14] Thread asyncio_6 is running on worker_id : 0 for machine : 6
DEBUG [2023-03-28 21:38:10,209] [root.custom_sleep:14] Thread asyncio_7 is running on worker_id : 0 for machine : 7
DEBUG [2023-03-28 21:38:10,209] [root.custom_sleep:14] Thread asyncio_8 is running on worker_id : 0 for machine : 8
DEBUG [2023-03-28 21:38:10,210] [root.custom_sleep:14] Thread asyncio_9 is running on worker_id : 0 for machine : 9
DEBUG [2023-03-28 21:38:10,211] [root.custom_sleep:14] Thread asyncio_13 is running on worker_id : 0 for machine : 13
DEBUG [2023-03-28 21:38:10,211] [root.custom_sleep:14] Thread asyncio_10 is running on worker_id : 0 for machine : 10
DEBUG [2023-03-28 21:38:10,211] [root.custom_sleep:14] Thread asyncio_11 is running on worker_id : 0 for machine : 11
DEBUG [2023-03-28 21:38:10,211] [root.custom_sleep:14] Thread asyncio_14 is running on worker_id : 0 for machine : 14
DEBUG [2023-03-28 21:38:10,211] [root.custom_sleep:14] Thread asyncio_12 is running on worker_id : 0 for machine : 12
DEBUG [2023-03-28 21:38:10,212] [root.custom_sleep:14] Thread asyncio_15 is running on worker_id : 0 for machine : 15
DEBUG [2023-03-28 21:38:10,213] [root.custom_sleep:14] Thread asyncio_17 is running on worker_id : 0 for machine : 17
DEBUG [2023-03-28 21:38:10,213] [root.custom_sleep:14] Thread asyncio_16 is running on worker_id : 0 for machine : 16
DEBUG [2023-03-28 21:38:10,214] [root.custom_sleep:14] Thread asyncio_18 is running on worker_id : 0 for machine : 18
DEBUG [2023-03-28 21:38:10,214] [root.custom_sleep:14] Thread asyncio_19 is running on worker_id : 0 for machine : 19
DEBUG [2023-03-28 21:38:11,207] [root.custom_sleep:14] Thread asyncio_0 is running on worker_id : 0 for machine : 20
DEBUG [2023-03-28 21:38:11,208] [root.custom_sleep:14] Thread asyncio_2 is running on worker_id : 0 for machine : 21
DEBUG [2023-03-28 21:38:11,208] [root.custom_sleep:14] Thread asyncio_1 is running on worker_id : 0 for machine : 22
DEBUG [2023-03-28 21:38:11,209] [root.custom_sleep:14] Thread asyncio_4 is running on worker_id : 0 for machine : 24
DEBUG [2023-03-28 21:38:11,209] [root.custom_sleep:14] Thread asyncio_5 is running on worker_id : 0 for machine : 25
DEBUG [2023-03-28 21:38:11,210] [root.custom_sleep:14] Thread asyncio_7 is running on worker_id : 0 for machine : 26
DEBUG [2023-03-28 21:38:11,211] [root.custom_sleep:14] Thread asyncio_3 is running on worker_id : 0 for machine : 23
DEBUG [2023-03-28 21:38:11,211] [root.custom_sleep:14] Thread asyncio_6 is running on worker_id : 0 for machine : 27
DEBUG [2023-03-28 21:38:11,212] [root.custom_sleep:14] Thread asyncio_8 is running on worker_id : 0 for machine : 28
DEBUG [2023-03-28 21:38:11,213] [root.custom_sleep:14] Thread asyncio_9 is running on worker_id : 0 for machine : 29
DEBUG [2023-03-28 21:38:11,213] [root.custom_sleep:14] Thread asyncio_13 is running on worker_id : 0 for machine : 30
DEBUG [2023-03-28 21:38:11,213] [root.custom_sleep:14] Thread asyncio_10 is running on worker_id : 0 for machine : 31
DEBUG [2023-03-28 21:38:11,214] [root.custom_sleep:14] Thread asyncio_11 is running on worker_id : 0 for machine : 32
DEBUG [2023-03-28 21:38:11,214] [root.custom_sleep:14] Thread asyncio_12 is running on worker_id : 0 for machine : 33
DEBUG [2023-03-28 21:38:11,215] [root.custom_sleep:14] Thread asyncio_14 is running on worker_id : 0 for machine : 34
DEBUG [2023-03-28 21:38:11,215] [root.custom_sleep:14] Thread asyncio_17 is running on worker_id : 0 for machine : 37
DEBUG [2023-03-28 21:38:11,216] [root.custom_sleep:14] Thread asyncio_18 is running on worker_id : 0 for machine : 39
DEBUG [2023-03-28 21:38:11,216] [root.custom_sleep:14] Thread asyncio_19 is running on worker_id : 0 for machine : 38
DEBUG [2023-03-28 21:38:11,216] [root.custom_sleep:14] Thread asyncio_15 is running on worker_id : 0 for machine : 35
DEBUG [2023-03-28 21:38:11,216] [root.custom_sleep:14] Thread asyncio_16 is running on worker_id : 0 for machine : 36

我已经看到我可以使用具有max_worker值的concurrent.futures.ThreadPoolExecuter上下文管理器,但我想尽可能地坚持使用asyncio。(除非没有其他选项)。

另一个选择是在asyncio中完全转换代码,但这需要一些时间。

另一个选择可能是使用多进程,但现在似乎是多余的。

你知道为什么线程是有限的吗?如何提高这个极限呢?

提前谢谢你

asyncio函数to_thread实际上是ThreadPoolExecutor的包装。下面是它的代码,删除了一些与这个问题无关的细节(来自asyncio.thread.py, Python3.11):

loop = events.get_running_loop()
return await loop.run_in_executor(None, func_call)

这里func_call基本上是传递给to_thread的函数。当None是第一个参数时,使用默认的ThreadPoolExecutor。所以,即使你说你不想使用ThreadPoolExecutor,你实际上是这样做的。而你使用的是默认的

下面是ThreadPoolExecutor的文档片段:

"如果max_workers为None或未给出,它将默认为机器上的处理器数乘以5。">

因此,您可以看到这将限制线程的数量,在您的示例中为20。您可以使用asyncio.loop.set_default_executor函数定义具有更高线程限制的ThreadPoolExecutor并对其进行设置。https://docs.python.org/3/library/asyncio-eventloop.html?highlight=threadpoolexecutor asyncio.loop.set_default_executor

试着在operation_loop开头添加这两行:

async def operation_loop():
loop = asyncio.get_running_loop()
loop.set_default_executor(ThreadPoolExecutor(max_workers=40))

to_thread的后续调用现在将使用线程限制为40的执行器。

我希望您意识到Python线程实际上并不是并行运行的。Python在线程之间快速切换,以提供并行的外观。使用sleep函数的测试代码没有说明这种情况。在实际的程序中,这可能非常重要。

如果你需要真正的并行处理,你必须创建单独的Python进程。