的简单示例
import asyncio
import logging
from aiogram import Bot, Dispatcher, types
logging.basicConfig(level=logging.INFO)
token = 'token'
bot = Bot(token=token)
dp = Dispatcher(bot=bot)
@dp.callback_query_handler(text='stoploop')
async def stop_loop(query: types.CallbackQuery):
# TODO how to stop test loop?
await query.message.edit_text('stop')
@dp.callback_query_handler(text='test')
async def start_loop(query: types.CallbackQuery):
a = 100
while True:
a -= 1
markup = types.InlineKeyboardMarkup()
markup.add(types.InlineKeyboardButton('<<<Stop And Back To Home', callback_data='stoploop'))
await query.message.edit_text(str(a),reply_markup=markup)
await asyncio.sleep(1)
@dp.message_handler(commands='start')
async def start_cmd_handler(message: types.Message):
markup = types.InlineKeyboardMarkup()
markup.add(
types.InlineKeyboardButton('start loop', callback_data='test')
)
await message.reply('test', reply_markup=markup)
async def main():
try:
await dp.start_polling()
finally:
await bot.close()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
当我点击start_loop时,页面上的tg消息框开始显示倒计时。当我单击停止时,如何停止上一个倒计时?我使用id(query(来确认两次发送的查询实例不相同。在我执行stop_oop函数后,start_loop仍将执行并更改消息的内容。有人能告诉我如何阻止它吗?
我使用redis来解决它,但我不知道这是否是最合适的方法。如果有更合适的方式,请让我知道
要管理循环,您应该将其带到处理程序之外,然后从任何存储区进入(例如使用dict(。
环路的基本示例
loops = {}
class Loop:
def __init__(self, user_id):
self.user_id = user_id
self._active = False
self._stopped = True
loops[self.user_id] = self
@classmethod
def get_loop(cls, user_id):
return loops.get(user_id, cls(user_id))
@property
def is_running(self):
return not self._stopped
async def start(self):
self._active = True
asyncio.create_task(self._run_loop())
async def _run_loop(self):
while self._active:
await bot.send_message(self.user_id, 'loop is running')
await asyncio.sleep(5)
self._stopped = True
async def stop(self):
self._active = False
while not self._stopped:
await asyncio.sleep(1)
那么:
@dp.callback_query_handler(text='start')
async def start_loop(query: CallbackQuery):
user = query.from_user
loop = Loop.get_loop(user.id)
if loop.is_running:
return await query.answer('Loop is already running')
loop.start()
await query.answer('Started!')
@dp.callback_query_handler(text='stop')
async def stop_loop(query: CallbackQuery):
user = query.from_user
loop = Loop.get_loop(user.id)
await query.answer('Stopping...')
await loop.stop()
await bot.send_message(user.id, 'Loop successfully stopped.')