我正在尝试让发送者过滤器工作,例如
@celery.task
def run_timer(crawl_start_time):
return crawl_start_time
@task_success.connect
def run_timer_success_handler(sender, result, **kwargs):
print '##################################'
print 'in run_timer_success_handler'
上面的工作原理很好,但如果我尝试按发件人过滤,它永远不会起作用:
@task_success.connect(sender='tasks.run_timer')
def run_timer_success_handler(sender, result, **kwargs):
print '##################################'
print 'in run_timer_success_handler'
我还尝试过:@task_success.connect(sender='run_timer')@task_success.connect(sender=run_timer)@task_success.connect(sender=globals()['run_timer'])
它们都不起作用。
如何有效地使用sender过滤器来确保对run_timer任务而不是其他任务调用by回调。
在这种情况下,现在最好在函数内部过滤sender。类似:
@task_success.connect
def ...
if sender == '...':
...
因为当前的芹菜信号实现在任务发送方和工作进程是不同的python进程时存在问题。因为它将您的发送者转换为标识符并使用它进行过滤,但celeron按字符串名称发送任务。这是问题代码(celener.utils.dispatch.signals):
def _make_id(target): # pragma: no cover
if hasattr(target, 'im_func'):
return (id(target.im_self), id(target.im_func))
return id(target)
id('tasks.run_timer')与工作进程的id('asks.run_ttimer')不同。如果你愿意,你可以破解它,并通过hash函数
http://docs.celeryproject.org/en/latest/userguide/signals.html#task-成功。。。Sender是执行的任务对象。(与after_task_publish.sesender不同)。。。所以,你应该
@task_success.connect(sender=run_timer)
def ...
它对我有用。祝你好运。