在异步学习和测试过程中,我用3个并发任务编写了下面的代码。
import asyncio
from time import time
tasks_to_schedule = []
task_queue = []
class Test():
def __init__(self, task_name, repeat_every):
self.name = task_name
self.repeat_every = repeat_every
self.scheduled = 0
def schedule(self, t_now):
self.scheduled = t_now
async def run(self):
print(f'It is {self.name}')
print(f'{self.name} running...')
await asyncio.sleep(2)
print(f'{self.name} finished')
def check_result(self):
pass
async def report(self):
print(f'{self.name} report DONE')
await asyncio.sleep(1)
def prepare_tasks():
task_a = Test('Task A', 2)
task_b = Test('Task B', 4)
tasks_to_schedule.append(task_a)
tasks_to_schedule.append(task_b)
async def scheduler():
turn = 0
while turn < 5:
if tasks_to_schedule:
print(f'***t Turn {turn} t***')
task = tasks_to_schedule.pop(0)
if task.scheduled < time():
task_queue.append(task)
print(f'adding task {task.name} to queue,n queue size = {len(task_queue)}')
turn += 1
else:
tasks_to_schedule.append(task)
await asyncio.sleep(1)
async def worker(name):
while True:
if task_queue:
task = task_queue.pop(0)
print(f'Worker {name} - took task {task.name}')
await task.run()
await task.report()
print(f'Worker {name} - task {task.name} completed, reschedule it')
task.schedule(time())
tasks_to_schedule.append(task)
# await asyncio.sleep(1) #Process stuck without this line
async def main():
task_scheduler = asyncio.create_task(scheduler())
worker1 = asyncio.create_task(worker(1))
worker2 = asyncio.create_task(worker(2))
await asyncio.gather(task_scheduler, worker1, worker2)
if __name__ == '__main__':
prepare_tasks()
asyncio.run(main())
过程中的问题在";任务A正在运行&";,唯一的输出是:
*** Turn 0 ***
adding task Task A to queue,
queue size = 1
Worker 1 - took task Task A
It is Task A
Task A running...
经过几次尝试,我注意到;等待异步睡眠(1(";循环末端的线在"内部";工人;func进程正确运行,没有任何阻塞。
我想知道,原因是什么?
有人能解释一下,为什么这条额外的线路会改变一切吗?
平台:Python 3.9.4,Windows 10 x64,内部供应商。
我在之后添加了一行
async def worker(name):
while True:
print(f'{strftime("%X")}: worker loop') #this line
我可以在输出中看到一个无休止的工作者循环。。。现在我明白了,工人找不到任务。。。
已解决:(