使用内置 sched 模块的非阻塞调度程序.run() 方法?



我想了解如何在方法scheduler.run(blocking=True)中使用可选参数blocking。任何实际的/真实的例子都会非常有帮助。

根据我迄今为止所做的研究,blocking可选参数的意图是用于非阻塞或异步应用程序[1][2]。下面是schduler的主运行循环(来自python 3.6库sched.py)。通过代码,我注意到每当blocking设置为False时,都会立即返回目标时间和当前时间之间的时间差,除非目标时间已经过去,在这种情况下会执行该操作。

while True:
with lock:
if not q:
break
time, priority, action, argument, kwargs = q[0]
now = timefunc()
if time > now:
delay = True
else:
delay = False
pop(q)
if delay:
if not blocking:
return time - now
delayfunc(time - now)
else:
action(*argument, **kwargs)
delayfunc(0)   # Let other threads run

在我看来,非阻塞设计要求我继续运行调度程序,直到队列干净为止。因此,我正在考虑自己维护一个任务队列,并不断将scheduler.run任务推送到队列中(就像下面的代码一样)。这是一个理想的设计吗?使用非阻塞调度器的正确方式是什么?

def action():
print('action at: ', datetime.now())
if __name__ == '__main__':
s = sched.scheduler(time.time)
target_time = datetime.now() + timedelta(seconds=5)
s.enterabs(target_time.timestamp(), 1, action)
run = functools.partial(s.run, blocking=False)
taskq = deque()
taskq.append(run)
while taskq:
task = taskq.popleft()
result = task()
print(result)
if result:
taskq.append(run)
time.sleep(1)
print('end tasks')

[1] Python 3.3 的新增功能

[2] Issue13449:sched-为run()方法提供一个"async"参数

老问题,但我刚刚实现了一些非常有效地使用非阻塞版本的东西。

sched.scheduler.run中的blocking = True时,它将调用delayfunc以获取到下一个事件的时间差。

如果您的应用程序在t = 0t = 10调度事件A,而另一个线程在t = 1t = 5调度事件B,则这可能是不可取的。在这种情况下,

s = sched.scheduler(time.time)
# Spawn threads which enter A and B into s
while True:
s.run(True)

如果主线程只是在循环中调用sched.scheduler.run(blocking=True),那么在t = 0,它将调用delayfunc(10),因为它只看到在A之前还有10个时间单位。主线程要到t=10才会唤醒,这时它会发现它错过了B,延迟5个时间单位运行B,然后在B之后运行A

为了解决这个问题,您可以将主线程更改为如下所示:

s = sched.scheduler(time.time)
# Spawn threads which enter A and B into s
while True:
next_ev = s.run(False)
if next_ev is not None:
time.sleep(min(1, next_ev))
else:
time.sleep(1)

此代码将跟踪所有当前事件,然后睡眠到下一个事件,或者如果没有下一个活动,或者如果下一个项目太超前,将睡眠1秒。

理想情况下,如果一个新事件到达优先级队列的前面,那么调度器将使用一个条件变量来实现,并且它可以等待该变量,而不是仅仅休眠到下一个事件。这将是最有效和最准确的时间。

最新更新