Python每5分钟做一次



我需要检查API上的数据。API每5分钟刷新一次新数据(10:00、10:05、10:10等(

我不想使用时间.sleep(300(,因为我希望我的脚本在10:05:03、10:05:0等时间做一些事情,而不是在脚本启动后5分钟(可能是在10h12开始我该如何构建?

谢谢大家。

更新:

只是想消除递归错误的可能性,所以我重写了代码:

from threading import Thread
from time import sleep
import datetime
def check_api():
# ... your code here ...
pass
def schedule_api():
while datetime.datetime.now().minute % 5 != 0:
sleep(1)
check_api()
while True:
sleep(300)
check_api()
thread = Thread(target=schedule_api)
thread.start()

此外,如果你想让你的线程在主程序退出时退出,你可以在线程上设置守护进程为True,比如:

thread.daemon = True

但这并不能强制这个线程干净地终止,所以你也可以尝试下面的方法:

# ...
RUNNING = True
# ...
thread = Thread(target=schedule_api)
thread.start()
#...
def main():
# ... all main code ...
pass
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
RUNNING = False


您可以使用以下代码:

import threading
def check_api():
pass
timer_thread = threading.Timer(300, check_api)
timer_thread.start()
# call timer_thread.cancel() when you need it to stop

这将每5分钟调用一次check_api函数,并且不会阻止主代码的执行。

正如@scotyyy3785所提到的,上面的代码只会运行一次,但我意识到你想要什么,并为此编写了代码:

from threading import Thread
from time import sleep
import datetime

def check_api():
# ... your code here ...
pass

def caller(callback_func, first=True):
if first:
while not datetime.datetime.now().minute % 5 == 0:
sleep(1)
callback_func()
sleep(300)
caller(callback_func, False)

thread = Thread(target=caller, args=(check_api,))
thread.start()
# you'll have to handle the still running thread on exit

上述代码将在00、05、10、15…等分钟调用check_api

在循环中定期检查时间,并在特定的分钟标记处做一些事情:

import time
# returns the next 5 minute mark
# e.g. at minute 2 return 5
def get_next_time():
minute = time.localtime().tm_min
result = 5 - (minute % 5) + minute
if result == 60:
result = 0
return result
next_run = get_next_time()
while True:
now = time.localtime()
# at minute 0, 5, 10... + 3 seconds:
if next_run == now.tm_min and now.tm_sec >= 3:
print("checking api")
next_run = get_next_time()
time.sleep(1)

最新更新