制作计时器:线程超时不准确.Event.wait - Python 3.6.



首先,我是Python的新手,不熟悉它的功能。我一直在使用 MATLAB。

电脑简要规格: 视窗 10, 英特尔 i7

我正在尝试创建一个计时器类来定期执行诸如 MATLAB 之类的函数,这显然是从 Java 计时器借来的。MATLAB 计时器的分辨率约为 1 毫秒,我从未见过它在任何情况下超过 2 毫秒。事实上,对于我的项目来说,它足够准确。

最近,我计划迁移到Python,因为MATLAB的并行计算和Web访问功能很差。然而,不幸的是,Python 的标准包提供了一些低级别的计时器(线程。计时器)与 MATLAB 相比,我必须制作自己的计时器类。首先,我提到了 QnA 在 Python 中执行定期操作 [重复]。迈克尔·安德森提出的解决方案给出了漂移校正的简单概念。他使用 time.sleep() 来保持周期。该方法非常精确,有时比 MATLAB 计时器具有更高的精度。约 0.5 ms 分辨率。但是,计时器在 time.sleep() 中捕获期间不能中断(暂停或恢复)。但是我有时必须立即停止,无论它是否处于睡眠状态()。

我发现的问题的解决方案是利用线程包中的 Event 类。请参阅 Python threading.timer - 每 'n' 秒重复一次函数 .使用 Event.wait() 的超时功能,我可以在执行之间留出一个时间间隔,它用于保持周期。也就是说,事件通常会被清除,以便 wait(timeout) 可以像 time.sleep(interval) 一样工作,并且我可以在需要时通过设置事件立即退出 wait()。

当时一切似乎都很好,但 Event.wait() 中存在一个关键问题。时间延迟从 1 ~ 15 ms 变化太大。我认为它来自 Event.wait() 的开销。

我制作了一个示例代码,显示了 time.sleep() 和 Event.wait() 之间的准确性比较。这总共有 1000 次 1 毫秒 sleep() 和 wait() 的迭代,以查看累积的时间误差。预期结果约为 1.000。

import time
from threading import Event
time.sleep(3)  # to relax
# time.sleep()
tspan = 1
N = 1000
t1 = time.perf_counter()
for _ in range(N):
time.sleep(tspan/N)
t2 = time.perf_counter()
print(t2-t1)
time.sleep(3)  # to relax
# Event.wait()    
tspan = 1
event = Event()
t1 = time.perf_counter()
for _ in range(N):
event.wait(tspan/N)
t2 = time.perf_counter()
print(t2-t1)

结果:

1.1379848184879964
15.614547161211096

结果表明 time.sleep() 的准确性要好得多。但是我不能完全依赖前面提到的time.sleep()。

综上所述,

  • time.sleep():准确但不可中断
  • 线程。Event.wait():不准确但可中断

我目前正在考虑一个折衷方案:就像在示例中一样,创建一个微小的 time.sleep()(间隔为 0.5 毫秒)的循环,并使用 if 语句退出循环并在需要时中断。据我所知,该方法在Python 2.x Python time.sleep() vs event.wait()中使用。

这是一个冗长的介绍,但我的问题可以总结如下。

  1. 我可以通过外部信号或事件强制线程进程从 time.sleep() 中断吗?(这似乎是最有效的???。

  2. 使 Event.wait() 更准确或减少开销时间。

  3. 除了 sleep() 和 Event.wait() 方法之外,还有没有更好的方法来提高计时精度。

谢谢。

我在Event.wait()上遇到了同样的计时问题。 我想出的解决方案是创建一个模仿threading.Event的类。 在内部,它使用time.sleep()循环和繁忙循环的组合,以大大提高精度。 睡眠循环在单独的线程中运行,因此仍然可以立即中断主线程中的阻塞wait()调用。 调用set()方法时,睡眠线程应在不久之后终止。 此外,为了最小化 CPU 利用率,我确保繁忙循环的运行时间永远不会超过 3 毫秒。

这是我的自定义Event类以及最后的计时演示(演示的打印执行时间将以纳秒为单位):

import time
import _thread
import datetime

class Event:
__slots__ = (
"_flag", "_lock", "_nl",
"_pc", "_waiters"
)
_lock_type = _thread.LockType
_timedelta = datetime.timedelta
_perf_counter = time.perf_counter
_new_lock = _thread.allocate_lock
class _switch:
__slots__ = ("_on",)
def __call__(self, on: bool = None):
if on is None:
return self._on
self._on = on
def __bool__(self):
return self._on
def __init__(self):
self._on = False
def clear(self):
with self._lock:
self._flag(False)
def is_set(self) -> bool:
return self._flag()
def set(self):
with self._lock:
self._flag(True)
waiters = self._waiters
for waiter in waiters:
waiter.release()
waiters.clear()
def wait(
self,
timeout: float = None
) -> bool:
with self._lock:
return self._wait(self._pc(), timeout)
def _new_waiter(self) -> _lock_type:
waiter = self._nl()
waiter.acquire()
self._waiters.append(waiter)
return waiter
def _wait(
self,
start: float,
timeout: float,
td=_timedelta,
pc=_perf_counter,
end: _timedelta = None,
waiter: _lock_type = None,
new_thread=_thread.start_new_thread,
thread_delay=_timedelta(milliseconds=3)
) -> bool:
flag = self._flag
if flag:
return True
elif timeout is None:
waiter = self._new_waiter()
elif timeout <= 0:
return False
else:
delay = td(seconds=timeout)
end = td(seconds=start) + delay
if delay > thread_delay:
mark = end - thread_delay
waiter = self._new_waiter()
new_thread(
self._wait_thread,
(flag, mark, waiter)
)
lock = self._lock
lock.release()
try:
if waiter:
waiter.acquire()
if end:
while (
not flag and
td(seconds=pc()) < end
):
pass
finally:
lock.acquire()
if waiter and not flag:
self._waiters.remove(waiter)
return flag()
@staticmethod
def _wait_thread(
flag: _switch,
mark: _timedelta,
waiter: _lock_type,
td=_timedelta,
pc=_perf_counter,
sleep=time.sleep
):
while not flag and td(seconds=pc()) < mark:
sleep(0.001)
if waiter.locked():
waiter.release()
def __new__(cls):
_new_lock = cls._new_lock
_self = object.__new__(cls)
_self._waiters = []
_self._nl = _new_lock
_self._lock = _new_lock()
_self._flag = cls._switch()
_self._pc = cls._perf_counter
return _self

if __name__ == "__main__":
def test_wait_time():
wait_time = datetime.timedelta(microseconds=1)
wait_time = wait_time.total_seconds()
def test(
event=Event(),
delay=wait_time,
pc=time.perf_counter
):
pc1 = pc()
event.wait(delay)
pc2 = pc()
pc1, pc2 = [
int(nbr * 1000000000)
for nbr in (pc1, pc2)
]
return pc2 - pc1
lst = [
f"{i}.tt{test()}"
for i in range(1, 11)
]
print("n".join(lst))
test_wait_time()
del test_wait_time

Chris D 的自定义事件类效果非常好!出于实际目的,我已将其包含在可安装的软件包(https://github.com/ovinc/oclock,使用pip install oclock安装)中,该软件包还包括其他计时工具。从oclock的 1.3.0 版本开始,可以使用 Chris D 答案中讨论的自定义Event类,例如

from oclock import Event
event = Event()
event.wait(1)

使用通常的set()clear()is_set()wait()Event类的方法。

定时精度比threading.Event要好得多,特别是在Windows中。例如,在具有 1000 个重复循环的 Windows 机器上,我得到的循环持续时间的标准偏差为threading.Event7 毫秒,oclock.Event小于 0.01 毫秒。道具给克里斯·

注意oclock软件包采用 GPLv3 许可证,以便与 StackOverflow 的 CC BY-SA 4.0 兼容。

感谢您提供此主题和所有答案。我还遇到了一些计时不准确的麻烦(Windows 10 + Python 3.9 +线程)。

解决方案是使用oclock包,并通过wres包更改Windows系统计时器的(临时)分辨率。此软件包使用未记录的 Windows API 函数 NtSetTimerResolution (警告:分辨率在系统范围内已更改)。

仅应用oclock包并不能解决问题。

应用了两个 python 包后,下面的代码可以正确且精确地调度定期事件。如果终止,将恢复原始计时器分辨率。

import threading
import datetime
import time
import oclock
import wres
class Job(threading.Thread):
def __init__(self, interval, *args, **kwargs):
threading.Thread.__init__(self)
# use oclock.Event() instead of threading.Event()
self.stopped = oclock.Event()
self.interval = interval.total_seconds()
self.args = args
self.kwargs = kwargs
def stop(self):
self.stopped.set()
self.join()
def run(self):
prevTime = time.time()
while not self.stopped.wait(self.interval):
now = time.time()
print(now - prevTime)
prevTime = now
# Set system timer resolution to 1 ms
# Automatically restore previous resolution when exit with statement
with wres.set_resolution(10000):
# Create thread with periodic task called every 10 ms
job = Job(interval=datetime.timedelta(seconds=0.010))
job.start()
try:
while True:
time.sleep(1)
# Hit Ctrl+C to terminate main loop and spawned thread
except KeyboardInterrupt:
job.stop()

最新更新