包装python async以进行同步执行



我正试图尽快从本地Postgres数据库加载数据,看来性能最好的python包是asyncpg。我的代码是同步的,我反复需要加载数据块。我对将async关键字传播到我编写的每个函数不感兴趣,所以我尝试将异步代码封装在同步函数中。

下面的代码可以工作,但非常难看:

def connect_to_postgres(user, password, database, host):
async def wrapped():
return await asyncpg.connect(user=keys['user'], password=keys['password'],
database='markets', host='127.0.0.1')
loop = asyncio.get_event_loop()    
db_connection = loop.run_until_complete(wrapped())
return db_connection

db_connection = connect_to_postgres(keys['user'], keys['password'],
'db', '127.0.0.1')
def fetch_from_postgres(query, db_connection):
async def wrapped():
return await db_connection.fetch(query)
loop = asyncio.get_event_loop()    
values = loop.run_until_complete(wrapped())
return values
fetch_from_postgres("SELECT * from db LIMIT 5", db_connection)

在Julia,我会做一些类似的事情

f() = @async 5
g() = fetch(f())
g()

但在Python中,我似乎不得不做相当笨拙的

async def f():
return 5
def g():
loop = asyncio.get_event_loop()    
return loop.run_until_complete(f())

只是想知道是否有更好的方法?

编辑:后一个python示例当然可以使用编写

def fetch(x):
loop = asyncio.get_event_loop()    
return loop.run_until_complete(x)

尽管如此,仍然需要创建一个异步封装的函数,除非我遗漏了什么。

编辑2:我确实关心性能,但希望使用同步编程方法。asyncpg比psycopg2快3倍,因为它的核心实现是在Cython而不是Python中实现的,这在https://magic.io/blog/asyncpg-1m-rows-from-postgres-to-python/.因此,我希望包装这个异步代码。

编辑3:提出这个问题的另一种方式是什么是避免"你的功能是什么颜色;在python中?

如果您在一开始就设置程序结构,这并不难做到。您创建了第二个异步代码将在其中运行的线程,并启动其事件循环。当保持完全同步的主线程想要异步调用(协程(的结果时,可以使用方法asyncio.run_coroutine_threadsafe。该方法返回一个concurrent.futures.Future对象。您可以通过调用其方法result((来获得返回值,该方法会阻塞直到结果可用。

这几乎就像您像调用子例程一样调用异步方法。由于只创建了一个辅助线程,因此开销最小。这里有一个简单的例子:

import asyncio
import threading
from datetime import datetime
async def demo(t):
await asyncio.sleep(t)
print(f"Demo function {t} {datetime.now()}")
return t
def main():
def thr(loop):
asyncio.set_event_loop(loop)
loop.run_forever()

loop = asyncio.new_event_loop()
t = threading.Thread(target=thr, args=(loop, ), daemon=True)
t.start()
print("Main", datetime.now())
t1 = asyncio.run_coroutine_threadsafe(demo(1.0), loop).result()
t2 = asyncio.run_coroutine_threadsafe(demo(2.0), loop).result()
print(t1, t2)
if __name__ == "__main__":
main()
# >>> Main 2021-12-06 19:14:14.135206
# >>> Demo function 1.0 2021-12-06 19:14:15.146803
# >>> Demo function 2.0 2021-12-06 19:14:17.155898
# >>> 1.0 2.0

主程序在第一次调用demo((时会延迟1秒,在第二次调用时会延迟2秒。这是因为主线程没有事件循环,因此无法并行执行这两个延迟。但这正是你所暗示的,当你说你想要一个使用第三方异步包的同步程序时。

这是一个类似的答案,但问题略有不同:

如何使用Python asyncio在asyncpg API上实现同步外观?

最新更新