如何使用Uvicorn与asyncio.create_task()把任务在后台?



假设我有一个由Uvicorn服务器驱动的web应用程序,该应用程序实现了GraphQL API,该API具有在服务器端启动长计算的突变和检查服务器状态的查询端点。假设我们想知道有多少任务在后台运行。我有一个简化的代码,它不起作用:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications
query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()
FIFO = []
async def long_task():
print('Starting long task...')
global FIFO
FIFO = [1, *FIFO]
# mock long calc with 5sec sleep
time.sleep(5)
FIFO.pop()
print('Long task finished!')

@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
print('Start to resolve long task...')
asyncio.create_task(long_task())
print('Resolve finished!')
return {}
@query_t.field('ping')
async def resolve_ping(_parent, info):
return f'FIFO has {len(FIFO)} elements'

def main():
schema_str = ariadne.gql('''
type Mutation{
longTask: longTaskResponse
}
type longTaskResponse {
message: String
}
type Query {
ping: String
}
''')
schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
gql_app = ariadne.asgi.GraphQL(schema)
app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
uvicorn.run(app,
host='0.0.0.0',
port=9002,
log_level='error')

if __name__ == '__main__':
main()

跑后

$ python main.py

我在第一个选项卡中的GraphQL GUI中发送了一个突变:

mutation longTaskQueue{
longTask {
message
}
}

在第二个选项卡中,我尝试检索FIFO的长度:

query ping {
ping
}

似乎可以运行2个long_task,但ping正在等待所有long_task完成。我的一般问题是如何在后台运行繁重的代码,不阻止GQL API?

经过多次尝试,现在我可以在后台放置许多任务并跟踪它们的数量(API不会在一个长任务上冻结)。现在的情况是阻塞计算在一个池中运行:

import asyncio
import logging
import time
import ariadne
import ariadne.asgi
import uvicorn
import starlette as sl
import starlette.applications
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
query_t = ariadne.QueryType()
mutation_t = ariadne.MutationType()
FIFO = []
async def long_task():
print('Starting long task...')
global FIFO
FIFO = [1, *FIFO]
# mock long calc with 5sec sleep
time.sleep(5)
FIFO.pop()
print('Long task finished!')

def run(corofn, *args):
loop = asyncio.new_event_loop()
try:
coro = corofn(*args)
asyncio.set_event_loop(loop)
return loop.run_until_complete(coro)
finally:
loop.close()

@mutation_t.field('longTask')
async def resolve_long_task(_parent, info):
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=5)
loop.set_default_executor(ProcessPoolExecutor())
print('Start to resolve long task...')
loop.run_in_executor(executor, run, long_task)
print('Resolve finished!')
return {}
@query_t.field('ping')
async def resolve_ping(_parent, info):
return f'FIFO has {len(FIFO)} elements'

def main():
schema_str = ariadne.gql('''
type Mutation{
longTask: longTaskResponse
}
type longTaskResponse {
message: String
}
type Query {
ping: String
}
''')
schema = ariadne.make_executable_schema(schema_str, query_t, mutation_t)
gql_app = ariadne.asgi.GraphQL(schema)
app = sl.applications.Starlette(routes=[sl.routing.Mount('/graphql', gql_app)])
uvicorn.run(app,
host='0.0.0.0',
port=9002,
log_level='error')

if __name__ == '__main__':
main()

从这个答案中得到灵感的解决方案。

相关内容

  • 没有找到相关文章

最新更新