Celery 的 app.control.broadcast 期待什么"command"?



我以为app.control.broadcast会采用@task,但是在运行以下内容时:

app.send_task("workerTasks_supervisor.task_supervisor_test", args=[], queue='supervisor')
app.control.broadcast("workerTasks_supervisor.task_supervisor_test", args=[], queue="supervisor")

第一个成功,第二个失败了:

[2019-08-01 12:10:52,260: ERROR/MainProcess] pidbox command error: KeyError('task_supervisor_test',)
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/kombu/pidbox.py", line 104, in dispatch
    reply = handle(method, arguments)
  File "/usr/local/lib/python3.5/dist-packages/kombu/pidbox.py", line 126, in handle_cast
    return self.handle(method, arguments)
  File "/usr/local/lib/python3.5/dist-packages/kombu/pidbox.py", line 120, in handle
    return self.handlers[method](self.state, **arguments)
KeyError: 'task_supervisor_test'

工人是从

开始的
celery worker -A workerTasks_supervisor -n Supervisor --concurrency=1 --loglevel=info -Q supervisor -f /logs/celery_supervisor.log --pidfile=/logs/supervisor_pid.pid

任务本身很简单:

@app.task()
def task_supervisor_test():
    print("working")

我在做什么错?谢谢。

您的假设是错误的。

您的第二行试图广播您未实现的命令,并且自然会引发异常。

芹菜的美(其中包括(是,它允许您实现自己的命令。您可以像上面尝试过的那样,或通过诸如celery -A my.project.app <command> [params...]之类的命令行以编程执行。我建议每个芹菜动力用户都应该学习这是一个非常有力的概念。

好吧,这是一组不同的事物 - 您需要像这样注册它们:

from celery.worker.control import Panel
@Panel.register
def command_supervisor_test():
    print("working")

在控制模块中,您可以找到一些示例。

from celery.app import control

作为示例吊销函数在内部广播:

    def revoke(self, task_id, destination=None, terminate=False,
               signal=TERM_SIGNAME, **kwargs):
        return self.broadcast('revoke', destination=destination, arguments={
            'task_id': task_id,
            'terminate': terminate,
            'signal': signal,
        }, **kwargs)

相关内容

  • 没有找到相关文章

最新更新