Pyrogram:如何在FastAPI路由器中运行异步方法



如何在FastApi路由中运行pyrogram方法

我使用Pyrogram,我有一些使用Telegram的方法。

class UserBot: 
def __init__(self, username: str, debug: Optional[bool] = False) -> None:
self.username = username
self.app = Client(f"sessions/{username}")
# For example get chat history (https://docs.pyrogram.org/api/methods/get_chat_history)
async def get_chat_history(self, chat_id: str) -> "List of chat messages":
try:
messages = list()
async with self.app as app:
async for message in app.get_chat_history(chat_id):
print(message)
messages.append(message)
logging.INFO()
return messages
except Exception as e:
return e

为了运行这些方法,我使用了pyrogram Client 中的内置运行功能

async def loop_methods(self, fn):
try:
self.app.run(fn)
logging.INFO()
except Exception as e:
return e

运行方法示例:

ubot = UserBot(username="donqhomo", debug=False)
ubot.loop_methods(ubot.get_chat_history(chat_id="@CryptoVedma"))

我想在fastapi路由器中运行我的pyrogram方法,我该怎么做?我尝试了一下,但没有消息打印到终端:

from fastapi import FastAPI, HTTPException
import asyncio
app = FastAPI()
@app.get("/test/")
async def test():
ubot = UserBot(username="donqhomo", debug=False)
loop = asyncio.get_running_loop()
task1 = loop.create_task(ubot.get_chat_history(chat_id="@CryptoVedma"))
await task1
return task1

我如何在fastapi中从pyrogram方法获得输出?

尝试tris代码

from fastapi import FastAPI, HTTPException
import asyncio
app = FastAPI()
@app.get("/test/")
async def test():
ubot = UserBot(username="donqhomo", debug=False)
return await bot.get_chat_history(chat_id="@CryptoVedma")

最新更新