这就是我所拥有的:
import youtube_dl # in case this matters
class ErrorCatchingTask(Task):
# Request = CustomRequest
def on_failure(self, exc, task_id, args, kwargs, einfo):
# If I comment this out, all is well
r = requests.post(server + "/error_status/")
....
@app.task(base=ErrorCatchingTask, bind=True, ignore_result=True, max_retires=1)
def process(self, param_1, param_2, param_3):
...
raise IndexError
...
工人将抛出异常,然后似乎带有不同任务ID Received task: process[{task_id}
的新任务这是我尝试过的几件事:
- 导入
from celery.worker.request import Request
和覆盖on_failure
和on_success
在此功能。 -
app.conf.broker_transport_options = {'visibility_timeout': 99999999999}
-
@app.task(base=ErrorCatchingTask, bind=True, ignore_result=True, max_retires=1)
- 关闭
DEBUG
模式 - 设置登录到
info
- 将
CELERY_IGNORE_RESULT
设置为false(我可以用芹菜使用Python请求吗?) -
import requests as apicall
排除名称空间冲突 - 金钱补丁
requests
芹菜 Eventlet 非阻止请求 - 将
ErrorCatchingTask
移至单独的文件
如果我不使用任何挂钩功能,那么工人将只抛出异常并保持空闲状态,直到安排下一个任务为止,即使我使用钩子,我也希望这是我期望的。这是一个错误吗?我遍及GitHub问题,但找不到同样的问题。您如何调试这样的问题?
django 1.11.16芹菜4.2.1
我使用grequests
后解决了我的问题在我的情况下,芹菜工人在requests/adapters.py
中调用Conn.Urlopen()后立即重新安排。我观察到的另一种行为是,如果我在同一台机器中开设了另一个项目的另一个工人,有时无限的重新安排会停止。这可能是一些最初用于其他目的的锁定机制。
因此,这使我怀疑这确实是线程问题,并且在研究requests
库是否安全之后,我发现有些人建议不同的事情。从理论上讲,猴子补丁应该与使用grequests
相似,但是它是不一样的,所以只需使用grequests
或erequests
库即可。
芹菜调试指令在这里