使用sender筛选器的celenite task_成功



我正在尝试让发送者过滤器工作,例如

@celery.task
def run_timer(crawl_start_time):
    return crawl_start_time
@task_success.connect
def run_timer_success_handler(sender, result, **kwargs):
    print '##################################'
    print 'in run_timer_success_handler'

上面的工作原理很好,但如果我尝试按发件人过滤,它永远不会起作用:

@task_success.connect(sender='tasks.run_timer')
def run_timer_success_handler(sender, result, **kwargs):
    print '##################################'
    print 'in run_timer_success_handler'

我还尝试过:@task_success.connect(sender='run_timer')@task_success.connect(sender=run_timer)@task_success.connect(sender=globals()['run_timer'])

它们都不起作用。

如何有效地使用sender过滤器来确保对run_timer任务而不是其他任务调用by回调。

在这种情况下,现在最好在函数内部过滤sender。类似:

@task_success.connect
def ...
    if sender == '...':
        ...

因为当前的芹菜信号实现在任务发送方和工作进程是不同的python进程时存在问题。因为它将您的发送者转换为标识符并使用它进行过滤,但celeron按字符串名称发送任务。这是问题代码(celener.utils.dispatch.signals):

def _make_id(target):  # pragma: no cover
    if hasattr(target, 'im_func'):
        return (id(target.im_self), id(target.im_func))
    return id(target)

id('tasks.run_timer')与工作进程的id('asks.run_ttimer')不同。如果你愿意,你可以破解它,并通过hash函数

重新设置id

http://docs.celeryproject.org/en/latest/userguide/signals.html#task-成功。。。Sender是执行的任务对象。(与after_task_publish.sesender不同)。。。所以,你应该

@task_success.connect(sender=run_timer)
def ...

它对我有用。祝你好运。

相关内容

  • 没有找到相关文章

最新更新