根据芹菜弦上的答案,链中有例外的链组
我收到一个错误,Celery似乎将任务签名链更改为字典,导致以下错误。
芹菜-4.4Redis-3.3.11
代码-来自@bigzbig
@celery_app.task
def task_one():
return 'OKIDOKI'
@celery_app.task
def task_two(str):
return f'{str} YOUPI'
@celery_app.task
def task_three(str):
return f'{str} MAKAPAKA'
@celery_app.task
def task_exception(str):
raise KeyError(f'{str} Ups')
@celery_app.task(ignore_result=True)
def task_wrapper(*args, **kwargs):
if 'job' in kwargs:
kwargs['job'].apply()
@celery_app.task(ignore_result=True)
def callback_task(*args, **kwargs):
return (args, kwargs, 'Yeah')
def test():
chains = []
tasks = [
task_one.s(),
task_two.s(),
task_exception.s(),
task_three.s(),
]
chains.append(task_wrapper.s(job=chain(*tasks)))
tasks = [
task_one.s(),
task_two.s(),
task_three.s(),
]
chains.append(task_wrapper.s(job=chain(*tasks)))
chord(chains, callback_task.s()).apply_async()
打印kwargs['job']
celeryworker2_1 | [2020-03-23 22:31:01,646: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_exception', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}
celeryworker_1 | [2020-03-23 22:31:01,650: WARNING/ForkPoolWorker-1] {'task': 'celery.chain', 'args': [], 'kwargs': {'tasks': [{'task': 'portfolio.tasks.task_one', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_two', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'chord_size': None, 'immutable': False}, {'task': 'portfolio.tasks.task_three', 'args': [], 'kwargs': {}, 'options': {}, 'subtask_type': None, 'immutable': False, 'chord_size': None}]}, 'options': {}, 'subtask_type': 'chain', 'immutable': False, 'chord_size': None}
错误
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/celery/app/trace.py", line 650, in __protected_call__
return self.run(*args, **kwargs)
File "/app/portfolio/tasks.py", line 241, in task_wrapper
kwargs['job'].apply()
AttributeError: 'dict' object has no attribute 'apply'
您正试图将签名传递给另一个任务。所以Celery将其转换为dict.你可以从dict.构建Signature
">task_always_earge=True"设置为在同一进程下运行,而不是作为不同的芹菜任务运行,因为执行链本身是不同的任务。通过这种方式,您将保留任何给定的链接或链接错误。
from celery.canvas import Signature
callback = Signature(kwargs['job'])
callback.delay(task_always_eager=True)