Asyncio.create_subprocess_exec 未实现错误 - 快速 api 后台任务



我正在尝试在 Fastapi 后台任务中调用asyncio.create_subprocess_exec,但它不断引发NotImplementedError。run_subprocess函数在 Fastapi 外部运行时工作正常。我正在使用异步循环而不是 uvloop 在窗口中运行它。

import asyncio
from fastapi import FastAPI, BackgroundTasks
DHCP_SERVER = "1.1.1.1"
app = FastAPI()
@app.get("/")
async def subprocess_test(background_tasks: BackgroundTasks):
background_tasks.add_task(run_subprocess)
async def run_subprocess():
proc = await asyncio.create_subprocess_exec(
'powershell.exe',
f'Get-Dhcp-Serverv4Scope -ComputerName "{DHCP_SERVER}"',
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if stderr:
print(stderr)
else:
print(stdout)

File ".subprocess_example.py", line 13 in run_subprocess
proc = await asyncio.create_subprocess_exec(
File "C:PythonPython38-32libasynciosubprocess.py", line 236, in create_subprcess_exec
transport, protocol = await loop.subprocess_exec(
File "C:PythonPython38-32libasynciobase_events.py", line 1615, in subprocess_exec
transport = await self._make_subprocess_transport(
File "C:PythonPython38-32libasynciobase_events.py", line 487, in _make_subprocess_transport
raise NotImplementedError

任何人都可以帮助解决这个问题吗?

谢谢!

我相信触发此错误是因为fastapi使用uvloop,而asyncio在没有设置策略的情况下不知道这一点,有一个答案提供了一些钩子如何实现这一目标;

与 UVloop 等效的异步事件循环

最新更新