用于迁移现有代码库的线程中的Python异步



我们有一个相当大的项目,它正在进行大量的网络连接(API调用、Websocket消息(,并且还有许多内部作业在线程中的间隔中运行。我们目前的架构涉及生成大量线程,当系统负载很大时,应用程序工作不太好,所以我们决定尝试异步。

我知道最好的方法是将整个代码库迁移到异步代码,但由于代码库的大小和有限的开发资源,这在不久的将来是不现实的。然而,我们希望开始迁移部分代码库以使用异步事件循环,希望在某个时候能够转换整个项目。

到目前为止,我们遇到的问题是,整个代码库都有同步代码,为了在其中添加非阻塞异步代码,代码需要在不同的线程中运行,因为你不能在同一个线程中真正运行异步和同步代码。

为了将异步和同步代码结合起来,我提出了在应用程序启动时创建的单独线程中运行异步代码的方法。代码的其他部分只需调用add_asyncio_task即可将作业添加到此循环中。

import threading
import asyncio
_tasks = []
def threaded_loop(loop):
asyncio.set_event_loop(loop)
global _tasks
while True:
if len(_tasks) > 0:
# create a copy of needed tasks
needed_tasks = _tasks.copy()
# flush current tasks so that next tasks can be easily added
_tasks = []
# run tasks
task_group = asyncio.gather(*needed_tasks)
loop.run_until_complete(task_group)

def add_asyncio_task(task):
_tasks.append(task)
def start_asyncio_loop():
loop = asyncio.get_event_loop()
t = threading.Thread(target=threaded_loop, args=(loop,))
t.start()

以及app.py中的某个位置:

start_asyncio_loop()

以及代码中的其他任何位置:

add_asyncio_task(some_coroutine)

由于我是asyncio的新手,我想知道在我们的情况下,这是否是一种好的方法,或者这种方法是否被认为是一种反模式的方法,并且有一些问题会在以后的道路上遇到?或者asyncio已经有了一些解决方案,而我只是想在这里发明轮子?

感谢您的投入!

这种方法一般来说是好的。不过你有一些问题:

(1(几乎所有异步对象都不是线程安全的

(2( 您的代码本身并不是线程安全的。如果一个任务出现在needed_tasks = _tasks.copy()之后但在_tasks = []之前,该怎么办?你需要一把锁。顺便说一句,复印是没有意义的。简单的needed_tasks = _tasks就可以了。

(3( 一些异步构造是线程安全的。使用它们:

import threading
import asyncio
# asyncio.get_event_loop() creates a new loop per thread. Keep
# a single reference to the main loop. You can even try
#   _loop = asyncio.new_event_loop()
_loop = asyncio.get_event_loop()
def get_app_loop():
return _loop
def asyncio_thread():
loop = get_app_loop()
asyncio.set_event_loop(loop)
loop.run_forever()
def add_asyncio_task(task):
asyncio.run_coroutine_threadsafe(task, get_app_loop())
def start_asyncio_loop():
t = threading.Thread(target=asyncio_thread)
t.start()

最新更新