如何在非装饰器函数中将我的任务添加到FastAPI中的后台任务中



在FAST API文档中,后台任务在路由器装饰器函数中运行。我想这是DI?

from fastapi import BackgroundTasks, FastAPI
app = FastAPI()

def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}

我的问题是…这有可能在非装饰器函数中运行吗,比如

from fastapi import BackgroundTasks, FastAPI
app = FastAPI()

def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
def some_function():
background_tasks.add_task(write_notification, email, message="some notification")
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
some_function()
return {"message": "Notification sent in the background"}

如果您在使用decorator时遇到困难,可以使用以下代码:

from fastapi import BackgroundTasks, FastAPI
app = FastAPI()

def write_notification(email: str, message=""):
print('TASK RUNNING..')
# with open("log.txt", mode="w") as email_file:
#     content = f"notification for {email}: {message}"
#     email_file.write(content)
print('TASK DONE!')
def some_function(background_tasks: BackgroundTasks, email):
background_tasks.add_task(write_notification, email, message="some notification")
@app.get("/send-notification/")
async def send_notification(email: str, background_tasks: BackgroundTasks):
some_function(background_tasks, email)
return {"message": "Notification sent in the background"} 

但我认为没有理由创建some_function((。你想拥有some_function((的原因是什么?