我有一个持续运行的代码使用计划,并且代码中有一个地方需要每天运行一次。
代码代码定义为嵌套函数。
关于代码的小演示:
def main():
...
...
...
def errorTable():
...
...
file_read.close()
while True:
try:
schedule.run_pending()
schedule.every(1).seconds.do(main)
except:
time.sleep(1)
main
函数大约每2分钟执行一次操作。但是,我希望其中的errorTable
函数每天只运行一次。
问题由于代码有一定的处理时间,我不能根据datetime运行它。因为时间是可以错过的。当我给出一个范围时,代码可以运行不止一次。
除了这些,有没有办法让我每天运行一次代码?
首先,我在你的代码中看到一个问题。似乎每一秒你都创造了一份新的工作。我不认为这是正确的。试着这样做:
def main():
...
...
...
def errorTable():
...
...
file_read.close()
schedule.every(1).seconds.do(main)
while True:
schedule.run_pending()
time.sleep(1)
第二,你似乎想处理异常(如果它们发生在你的函数中)。进度文档建议使用包装器:
import functools
def catch_exceptions(cancel_on_failure=False):
def catch_exceptions_decorator(job_func):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
return catch_exceptions_decorator
关于你的问题,我认为提取你想每天运行一次的函数并安排它是值得的:
@catch_exceptions(cancel_on_failure=False)
def main():
...
...
file_read.close()
@catch_exceptions(cancel_on_failure=False)
def errorTable():
...
...
schedule.every(1).seconds.do(main)
schedule.every(1).day.at("00:00").do(errorTable)
while True:
schedule.run_pending()
time.sleep(1)