Python 和 Celery:覆盖硬超时以用于 Gevent 池



有没有办法覆盖芹菜中的硬超时?我知道我可以通过失败作业的任务继承来做到这一点。

class MyTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask, soft_time_limit=5, time_limit=10)
def add(x, y):
raise KeyError()

但硬超时并不是失败的工作。我想这样做的原因是因为软超时不适用于 gevent 池,只有硬超时。

我花了一点时间才弄清楚,但这就是你的做法。 从请求继承,然后从任务继承。从 MyTask 调用请求(on_failure方法所在的位置(。

class MyRequest(Request):
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'A hard timeout was enforced for task %s',
self.task.name
)

class MyTask(Task):
Request = MyRequest  # you can use a FQN 'my.package:MyRequest'
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))

def run_time_job():
a = random.randrange(0, 20)
print('sleeping for', a)
time.sleep(a)

@app.task(base=MyTask, soft_time_limit=5, time_limit=10)
def add(x, y):
results = None
try:
run_time_job()
results = x + y
except SoftTimeLimitExceeded:
print('time limit exceeded')
redis_db.sadd('failed_jobs', 'failed at {} + {}'.format(x, y))
except TimeLimitExceeded:
raise KeyError()
return results

最新更新