如何使用python执行调度



我正试图在我的python中安排一些作业。据推测,日志记录中的文本应该每1分钟出现一次,每5分钟从docker容器中的jobs.py文件中出现一次。但是,文本每隔2分钟就会出现在docker容器中。python时间表和cronjobs之间是否存在冲突?

码头集装箱内的当前输出

13:05:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:05:00] "GET /reminder/send_reminders HTTP/1.1" 200 -
13:06:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:06:00] "GET /feeds/update_feeds HTTP/1.1" 200 -
13:07:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})
13:07:00 [I] jobs job_feeds_update
13:07:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:07:00] "GET /feeds/update_feeds HTTP/1.1" 200 -
13:08:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:08:00] "GET /feeds/update_feeds HTTP/1.1" 200 -
13:09:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})
13:09:00 [I] jobs job_feeds_update
13:09:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:09:00] "GET /feeds/update_feeds HTTP/1.1" 200 -
13:10:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:10:00] "GET /feeds/update_feeds HTTP/1.1" 200 -
13:10:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:10:00] "GET /reminder/send_reminders HTTP/1.1" 200 -
13:11:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})
13:11:00 [I] jobs job_feeds_update
13:11:00 [D] schedule Running job Job(interval=5, unit=minutes, do=job_send_reminders, args=(), kwargs={})
13:11:00 [I] jobs job_send_reminders

服务器.py

#Cron Job
@app.route('/feeds/update_feeds')
def update_feeds():
schedule.run_pending()
return 'OK UPDATED FEED!'

@app.route('/reminder/send_reminders')
def send_reminders():
schedule.run_pending()
return 'OK UPDATED STATUS!'

作业.py

def job_feeds_update():
update_feed()
update_feed_eng()
logger.info("job_feeds_update")

schedule.every(1).minutes.do(job_feeds_update)
# send email reminders
def job_send_reminders():
send_reminders()
logger.info("job_send_reminders")
schedule.every(5).minutes.do(job_send_reminders) 

Docker文件

FROM alpine:latest
# Install curlt 
RUN apk add --no-cache curl
# Copy Scripts to Docker Image
COPY reminders.sh /usr/local/bin/reminders.sh
COPY feeds.sh /usr/local/bin/feeds.sh
RUN echo ' */5  *  *  *  * /usr/local/bin/reminders.sh' >> /etc/crontabs/root
RUN echo ' *  *  *  *  * /usr/local/bin/feeds.sh' >> /etc/crontabs/root
# Run crond  -f for Foreground 
CMD ["/usr/sbin/crond", "-f"]

我认为您遇到了几个问题:

  1. 正如您所怀疑的,您的schedule与您的cron作业的时间表/间隔不同。它们不同步(你永远不会因为下一个原因而期望它们同步)。从执行jobs.py脚本的那一刻起,这就是计划计算间隔的起点

即,如果您每分钟都在运行一些东西,但jobs.py脚本在当前分钟后30秒开始(即30秒后01:00:30-凌晨1:00),则调度程序将在1:01:30、1:02:30、1:03:30运行作业,依此类推。

  1. Schedule不能保证精确的频率执行。调度程序运行作业时,不考虑作业执行时间。因此,如果你安排一些类似提要/提醒工作的事情,可能需要一点时间来处理。一旦它完成运行,调度程序就会决定下一个作业将在上一个作业结束后仅运行1分钟。这意味着你的执行时间可能会打乱计划

试着在python脚本中运行这个例子,看看我在说什么

# Schedule Library imported
import schedule
import time
from datetime import datetime

def geeks():
now = datetime.now() # current date and time
date_time = now.strftime("%m/%d/%Y, %H:%M:%S")
time.sleep(5)
print(date_time + "- Look at the timestamp")

geeks();
# Task scheduling
# After every 10mins geeks() is called.
schedule.every(1).seconds.do(geeks)

# Loop so that the scheduling task
# keeps on running all time.
while True:

# Checks whether a scheduled task
# is pending to run or not
schedule.run_pending()
time.sleep(0.1)

我们已将geeks函数安排为每秒运行一次。但如果你看一下geeks函数,我添加了一个time.sleep(5)来假设这里可能有一些阻塞API调用,这可能需要5秒。然后观察记录的时间戳——你会发现它们并不总是与我们最初想要的时间表一致!

现在了解您的cron作业和调度程序如何不同步

查看以下日志:

13:07:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})
13:07:00 [I] jobs job_feeds_update
13:07:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:07:00] "GET /feeds/update_feeds HTTP/1.1" 200 -
# minute 8 doesn't trigger the schedule for feeds
13:09:00 [D] schedule Running job Job(interval=1, unit=minutes, do=job_feeds_update, args=(), kwargs={})
13:09:00 [I] jobs job_feeds_update
13:09:00 [I] werkzeug 172.20.0.2 - - [08/May/2022 13:09:00] "GET /feeds/update_feeds HTTP/1.1" 200 -

这里可能发生的情况如下:

  • 在13:07:00,您的cron发送馈送项目的请求

  • 在13:07:00,作业计划有一个用于馈送项目的挂起作业

  • 13:07:00:,作业完成,计划决定下一个作业只能在1分钟后运行,大约是13:08:01(注意01,这是为了说明作业执行的毫秒/时间,假设运行提要项更新需要1秒)

  • 在13:08:00,您的cron作业触发请求schedulerun_pending作业。

  • 13:08:00但是没有要运行的挂起作业,因为下一次可以运行提要项目的时间是13:08:01(现在不是)。

  • 13:09:00,您的cron选项卡再次触发请求

  • 在13:09:00,有一个挂起的作业可用,该作业本应在13:08:01运行,因此现在执行。

我希望这能说明cron和schedule之间不同步的问题。在生产环境中,此问题会变得更糟。您可以阅读更多关于schedule的并行执行的信息,这是一种将事情从主线程中分离出来的方法,但仅限于此。让我们谈谈。。。

可能的解决方案

  1. 使用计划中的run_all而不是run_pending来强制触发作业,无论作业实际计划在何时

但仔细想想,这与直接从API路由本身调用job_feeds_update没有什么不同。这本身并不是一个坏主意,但它仍然不是超级干净的,因为它会阻塞API服务器的主线程,直到job_feeds_update完成,如果您有用户需要的其他路由,这可能并不理想。

你可以将此与下一个建议结合起来:

  1. 使用作业队列和线程查看计划文档的并行执行页面上的第二个示例。它向您展示了如何使用作业队列和线程来卸载作业

因为运行schedule.run_pending(),所以在作业运行之前,服务器中的主线程将被阻止。通过使用线程(+作业队列),您可以在队列中安排作业,避免用作业阻塞主服务器。这应该会让工作继续被安排,从而进一步优化您的工作。

  1. 请改用ischedule,因为它考虑了作业执行时间并提供了精确的时间表:https://pypi.org/project/ischedule/.这可能是最简单的解决方案,以防1+2最终令人头疼!

  2. 不要使用schedule,只需让您的cron作业到达一个只运行实际函数的路径(因此基本上与使用上面的1+2的建议相反)。这样做的问题是,如果您的函数运行提要更新的时间超过一分钟,则可能会有多个重叠的cron作业同时运行,以执行提要更新。因此,我建议不要这样做,而是依靠一种机制来使用线程和作业对请求进行排队/调度。只提到这是一个潜在的场景,你还可以做什么。