我们有一个Django应用,用芹菜来处理异步任务。我们使用AWS SQS作为任务代理。
我们最终得到了一个要处理的坏任务(删除了任务实现,但没有删除celery-beat条目)。这导致错误:
Received unregistered task of type KeyError('some_deleted_task').
The message has been ignored and discarded.
一旦我们清理了celery-beat条目,我们继续得到错误~2分钟(SQS上的可见性超时设置为2分钟)。
行为似乎是:
- 任务添加到队列中,在SQS中为'Available'
- Worker拾取任务,将SQS消息移动到'in flight'
- Worker由于缺少实现而立即失败。
- 两分钟后,SQS将消息从"飞行中"移回"可用">
- Goto 2
为了清除这些错误,我们清除了SQS队列,但这可能会导致丢失其他任务。
我想配置芹菜,这样它就不会无限期地尝试这些丢失的任务。
芹菜没有做任何事情来主动重试任务。正如消息所述,任务将被忽略。任务已经进入队列,因此,除非实际执行与消息关联的任务,否则芹菜不会将其从队列中删除。你所观察到的行为是正确和可取的。
您不希望收到未注册的任务导致消息从队列中删除,因为这可能导致消息丢失。例如,如果你用一个新任务部署了一个新版本的应用程序,但是一个旧的worker仍然是活跃的(这种情况发生在几乎所有的零停机部署中),旧版本可以从新版本接收消息,它会认为它已经接收了一个未注册的任务。如果您在遇到未注册的任务时删除消息,这可能会导致这些消息丢失,以及其他类似的情况。
芹菜框架中的一些可能选项:
钩子入任务拒绝信号
当任务被拒绝时,消费者将触发任务拒绝信号。您可以使用它来挂钩到事件并做一些事情(可能使用SQS API来移动消息)。
自定义消费者
在本例中,芹菜使用者在on_unknown_task方法中控制行为。你可以子类化和扩展消费者来控制行为。