Django 芹菜 - 完成后重新启动任务



@app.task
def Task1():
print("this is task 1")
return  "Task-1 Done"

举个例子我想在任务完成后重新启动

手动连续呼叫

如果你想多次调用任务,并且每次都使用相同的任务id,你可以使用apply_async的task_id参数。

请注意,这不适用于文档中的延迟:

延迟(* args, * * kwargs)

  • apply_async()的星形参数版本。

  • 不支持apply_async()启用的额外选项。

@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
>>> from tasks import Task1
>>> result = Task1.apply_async()
>>> result
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> result.id
'ba488582-9d7d-4bda-a19d-a2b0bf9b503f'
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> Task1.apply_async(task_id=result.id)
<AsyncResult: ba488582-9d7d-4bda-a19d-a2b0bf9b503f>
>>> 
[2021-08-12 08:24:31,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:24:31,538: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:24:31,539: WARNING/ForkPoolWorker-4] 
[2021-08-12 08:24:31,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.00041928999962692615s: None
[2021-08-12 08:25:00,608: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:00,609: WARNING/ForkPoolWorker-4] 
[2021-08-12 08:25:00,609: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0002528750001147273s: None
[2021-08-12 08:25:06,137: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:06,139: WARNING/ForkPoolWorker-4] 
[2021-08-12 08:25:06,139: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0003467680007815943s: None
[2021-08-12 08:25:10,537: INFO/MainProcess] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] received
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] this is task 1 ba488582-9d7d-4bda-a19d-a2b0bf9b503f
[2021-08-12 08:25:10,539: WARNING/ForkPoolWorker-4] 
[2021-08-12 08:25:10,539: INFO/ForkPoolWorker-4] Task tasks.Task1[ba488582-9d7d-4bda-a19d-a2b0bf9b503f] succeeded in 0.0006299719998423825s: None
  • 所有执行的任务id是相同的,这里是ba488582-9d7d-4bda-a19d-a2b0bf9b503f(在AsyncResult中也可见)

自动连续呼叫

如果您想继续重新启动任务,这里有一些选项。下面的所有选项都是无限递归的。您可能希望在任务中添加一些条件,说明何时终止无限循环,例如向任务添加输入,并在执行必须已经停止时将其用作基础。

选项1:在同一任务中异步调用任务本身。这有点像递归。这将使用与手动连续调用中相同的任务id。(见上图)。

@app.task(bind=True)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
Task1.apply_async(task_id=self.request.id)

选项2:触发芹菜提供的重试机制。这将在该链接中记录的相同队列上使用相同的任务id:

当你调用retry时,它会发送一个新的消息,使用相同的task-id,它会确保信息传递给相同的人队列作为起始任务。

我们可以通过self.request.id显示任务id来验证。

@app.task(
bind=True,
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise self.retry()

选项3:只对特定场景重试(这里是RestartTaskNeeded)。同选项2,这也将在同一队列上使用相同的任务id。

class RestartTaskNeeded(Exception):
pass

@app.task(
bind=True,
autoretry_for=(RestartTaskNeeded,),
default_retry_delay=0.1,
retry_backoff=False,
max_retries=None,
)
def Task1(self):
print(f"this is task 1 {self.request.id}")
time.sleep(2)
print("re-trigger task 1")
raise RestartTaskNeeded

输出:

>>> from tasks import Task1
>>> Task1.apply_async()
<AsyncResult: 999e9de0-292f-412d-a9a8-b5c0013bdab3>
[2021-08-12 07:51:29,783: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:29,785: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:31,796: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:31,797: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:31,820: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:31,820: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:32,020: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:34,023: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:34,028: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:34,028: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:36,031: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:38,034: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:38,038: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:38,039: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:40,041: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:40,042: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:42,044: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:42,045: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:42,049: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:42,051: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:44,050: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:44,051: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:46,052: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:46,057: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:46,058: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] this is task 1 999e9de0-292f-412d-a9a8-b5c0013bdab3
[2021-08-12 07:51:46,681: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:48,682: WARNING/ForkPoolWorker-1] re-trigger task 1
[2021-08-12 07:51:48,683: WARNING/ForkPoolWorker-1] 
[2021-08-12 07:51:48,687: INFO/ForkPoolWorker-1] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] retry: Retry in 0.1s: RestartTaskNeeded()
[2021-08-12 07:51:48,688: INFO/MainProcess] Task tasks.Task1[999e9de0-292f-412d-a9a8-b5c0013bdab3] received
... and so on ...
  • 任务总是(自动)"重新启动";完成后
  • 所有执行的任务id是相同的,这里是999e9de0-292f-412d-a9a8-b5c0013bdab3(在AsyncResult中也可见)

进一步阅读

根据你对这个问题的确切目的,你可能也对芹菜画布感兴趣,例如任务链(在完成另一个任务后调用一个任务,任务可能不同或相同)。

<<ul>
  • https://docs.celeryproject.org/en/latest/getting-started/next-steps.html链/gh>
  • https://docs.celeryproject.org/en/latest/userguide/canvas.html canvas-chain
  • 相关内容

    • 没有找到相关文章

    最新更新