我有一个Django模型,它有一个名为celery_task_id
的列。我使用RabbitMQ
作为经纪人。有一个名为 test_celery
的芹菜函数,它将模型对象作为参数。现在我有以下代码行来创建芹菜任务。
def create_celery_task():
celery_task_id = test_celery.apply_async((model_obj,), eta='Future Datetime Object')
model_obj.celery_task_id = celery_task_id
model_obj.save()
----
----
现在在芹菜函数中,我正在验证任务 ID 是否与存储在数据库中的任务 ID 相同。
@app.task
def test_celery(model_obj):
if model_obj.celery_task_id == test_celery.request.id:
## Do something
我的问题是在很多情况下,我可以看到任务正在接收并在日志中成功,但没有执行 if 条件中的代码。
重新分发后芹菜任务 ID 是否有可能更改。还是有其他原因。
建议之一是不要将数据库/ORM 对象传递到 Celery 任务中,因为 这些对象可能包含过时的数据。尝试将任务重写为:
@app.task
def test_celery(model_obj_id):
model_obj = YourModel.objects.get(id=model_obj_id)
if model_obj:
if model_obj.celery_task_id == test_celery.request.id:
## Do something