芹菜的过期选项不起作用



我在玩芹菜,我正在尝试用CELERYBEAT_SCHEDULER做一个周期性的任务。这是我的配置:

CELERY_TIMEZONE = 'Europe/Kiev'
CELERYBEAT_SCHEDULE = {
    'run-task-every-5-seconds': {
        'task': 'tasks.run_every_five_seconds',
        'schedule': timedelta(seconds=5),
        'options': {
            'expires': 10,
        }
    },
}
# the task
@app.task()
def run_every_five_seconds():
   return '5 seconds passed'

使用 celery -A celery_app beat 运行节拍时,任务似乎不会过期。然后我读到节拍可能存在一些问题,因此它没有考虑过期选项。

然后我尝试执行一个任务,因此手动调用它。

@app.task()
def print_hello():
    while True:
        print datetime.datetime.now()
        sleep(1)

我以这种方式调用任务:

print_hello.apply_async(args=[], expires=5)

工作人员控制台告诉我我的任务将过期,但它也不会过期。它被无限执行。

Received task: tasks.print_hello[05ee0175-cf3a-492b-9601-1450eaaf8ef7] expires:[2016-01-15 00:08:03.707062+02:00]

我做错了什么吗?

我认为你已经错误地理解了expires论点。

文档说:"过期时间后任务将不会执行。这意味着如果过期时间已过,执行将不会开始。如果执行已开始,则执行将运行完成。

您的配置每 5 秒向任务队列添加一个任务。如果从将任务添加到任务队列后的 10 秒内未开始执行,则会丢弃该任务。但是,任务会立即执行,因为有一个免费的芹菜工人可用。

您的代码示例添加一个任务,如果执行在 5 秒内未启动,则会放弃该任务。

要获得所需的功能,您可以将'expires': 10,替换为 'expires': datetime.datetime.now() + timedelta(seconds=10), 。这会将expires设置为绝对时间。

为了补充上一个答案,过期参数的用途在以下位置捕获: https://github.com/celery/celery/issues/591

让我用一个例子来解释,

假设您计划每 5 分钟执行一次任务。 因此,Celery Beat 每 5 分钟将任务添加到任务队列中。 现在,由于某种原因,如果工作线程不工作,它不会从任务队列中选择任何任务。 任务队列会随着时间的推移而增长,其中包含许多重复性任务。 一旦工人开始工作,它就会有大量积压工作,浪费时间做旧任务。

溶液? expires参数。

现在每个任务都有 1 分钟的过期时间。因此,当工作人员再次联机时,它会丢弃所有已过期的旧任务,只处理最新的未过期任务。多亏了这一点,工人不必在旧的重复性任务上浪费时间。

最佳实践

当您不知道要设置什么过期时间时,最好将其设置为等于计划/间隔。

前面的答案似乎只适用于芹菜,而不适用于芹菜节拍。

使用芹菜节拍的正确方法是使用 expire_seconds 选项,因为expires选项只接受日期时间:

CELERYBEAT_SCHEDULE = {
    'run-task-every-5-seconds': {
        'task': 'tasks.run_every_five_seconds',
        'schedule': timedelta(seconds=5),
        'options': {
            'expire_seconds': 10,
        }
    },
}

这在芹菜节拍文档中不是很清楚,但提到了这里。

相关内容

  • 没有找到相关文章

最新更新