我在玩芹菜,我正在尝试用CELERYBEAT_SCHEDULER
做一个周期性的任务。这是我的配置:
CELERY_TIMEZONE = 'Europe/Kiev'
CELERYBEAT_SCHEDULE = {
'run-task-every-5-seconds': {
'task': 'tasks.run_every_five_seconds',
'schedule': timedelta(seconds=5),
'options': {
'expires': 10,
}
},
}
# the task
@app.task()
def run_every_five_seconds():
return '5 seconds passed'
使用 celery -A celery_app beat
运行节拍时,任务似乎不会过期。然后我读到节拍可能存在一些问题,因此它没有考虑过期选项。
然后我尝试执行一个任务,因此手动调用它。
@app.task()
def print_hello():
while True:
print datetime.datetime.now()
sleep(1)
我以这种方式调用任务:
print_hello.apply_async(args=[], expires=5)
工作人员控制台告诉我我的任务将过期,但它也不会过期。它被无限执行。
Received task: tasks.print_hello[05ee0175-cf3a-492b-9601-1450eaaf8ef7] expires:[2016-01-15 00:08:03.707062+02:00]
我做错了什么吗?
我认为你已经错误地理解了expires
论点。
文档说:"过期时间后任务将不会执行。这意味着如果过期时间已过,执行将不会开始。如果执行已开始,则执行将运行完成。
您的配置每 5 秒向任务队列添加一个任务。如果从将任务添加到任务队列后的 10 秒内未开始执行,则会丢弃该任务。但是,任务会立即执行,因为有一个免费的芹菜工人可用。
您的代码示例添加一个任务,如果执行在 5 秒内未启动,则会放弃该任务。
要获得所需的功能,您可以将'expires': 10,
替换为 'expires': datetime.datetime.now() + timedelta(seconds=10),
。这会将expires
设置为绝对时间。
为了补充上一个答案,过期参数的用途在以下位置捕获: https://github.com/celery/celery/issues/591
让我用一个例子来解释,
假设您计划每 5 分钟执行一次任务。 因此,Celery Beat 每 5 分钟将任务添加到任务队列中。 现在,由于某种原因,如果工作线程不工作,它不会从任务队列中选择任何任务。 任务队列会随着时间的推移而增长,其中包含许多重复性任务。 一旦工人开始工作,它就会有大量积压工作,浪费时间做旧任务。
溶液? expires
参数。
现在每个任务都有 1 分钟的过期时间。因此,当工作人员再次联机时,它会丢弃所有已过期的旧任务,只处理最新的未过期任务。多亏了这一点,工人不必在旧的重复性任务上浪费时间。
最佳实践
当您不知道要设置什么过期时间时,最好将其设置为等于计划/间隔。
前面的答案似乎只适用于芹菜,而不适用于芹菜节拍。
使用芹菜节拍的正确方法是使用 expire_seconds
选项,因为expires
选项只接受日期时间:
CELERYBEAT_SCHEDULE = {
'run-task-every-5-seconds': {
'task': 'tasks.run_every_five_seconds',
'schedule': timedelta(seconds=5),
'options': {
'expire_seconds': 10,
}
},
}
这在芹菜节拍文档中不是很清楚,但提到了这里。