在Django中使用Celery收到未注册的任务



我在Django项目中设置了芹菜,似乎无法正常运行我的任务。我在Ubuntu 13.04上使用Django 1.4.3、cerele 3.0.1、Django-celele 3.0.17和Python 2.7。

我已经验证了我的rabbitmq服务器正在运行:

sudo service rabbitmq-server status
Status of node rabbit@fenster ...
[{pid,1667},
 {running_applications,[{rabbit,"RabbitMQ","3.0.2"},
                        {os_mon,"CPO  CXC 138 46","2.2.9"},
                        {mnesia,"MNESIA  CXC 138 12","4.7"},
                        {sasl,"SASL  CXC 138 11","2.2.1"},
                        {stdlib,"ERTS  CXC 138 10","1.18.1"},
                        {kernel,"ERTS  CXC 138 10","2.15.1"}]},
 {os,{unix,linux}},
 {erlang_version,"Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:8:8] [async-threads:30] [kernel-poll:true]n"},
 {memory,[{total,28112760},
          {connection_procs,93432},
          {queue_procs,42680},
          {plugins,0},
          {other_proc,9331824},
          {mnesia,65288},
          {mgmt_db,0},
          {msg_index,86656},
          {other_ets,764048},
          {binary,1343288},
          {code,12372068},
          {atom,512601},
          {other_system,3500875}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,6719406080},
 {disk_free_limit,1000000000},
 {disk_free,122925531136},
 {file_descriptors,[{total_limit,924},
                    {total_used,7},
                    {sockets_limit,829},
                    {sockets_used,3}]},
 {processes,[{limit,1048576},{used,151}]},
 {run_queue,0},
 {uptime,275789}]
...done.

我在虚拟环境中设置了Django项目。我在settings.py文件中将芹菜配置为:

INSTALLED_APPS = [
    # django apps
    # our apps ...
    # third party modules ...
    'djcelery',
]
import djcelery
djcelery.setup_loader()
CELERY_IMPORTS = ('my.task')

RabbitMQ代理程序设置正确——我们有其他芹菜任务正常运行。

我的任务:

import celery
class MyClass(object):
    @celery.task()
    def my_task(self, new, old):
        # do stuff
        return
    def kick_off_tasks(self):
        # do stuff
        new = 'something'
        old = 'something else'
        self.my_task.apply_async(args(new, old), kwarg1='new', kwarg2='old)
        # do more stuff

当我运行调用kick_off_tasks的视图时,我在celeryd信息日志中收到以下消息:

./manage.py celeryd --loglevel=info
[Tasks]
  . my.task
  . other.tasks
  . more.tasks
  . etc.etc.etc
[2013-08-23 14:18:40,772: WARNING/MainProcess] celery@fenster has started.
[2013-08-23 14:20:40,757: ERROR/MainProcess] Received unregistered task of type 'my.task'.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.
The full contents of the message body was:
{'retries': 0, 'task': 'my.task', 'eta': None, 'args': ('something, 'something else'), 'expires': None, 'callbacks': None, 'errbacks': None, 'kwargs': {}, 'id': 'bf4d5ee7-9701-42b6-a887-c6b6470d9810', 'utc': True} (2297b)
Traceback (most recent call last):
  File "/home/ricomoss/.virtualenvs/proton/local/lib/python2.7/site-packages/celery/worker/consumer.py", line 394, in on_task_received
    strategies[name](message, body, message.ack_log_error)
KeyError: 'my.task'

有什么建议吗?

我能够通过重构来实现这一点,从而使任务不是类方法。我把它们放在我们的tasks.py文件中,一切都很好。我还没有时间研究为什么会出现这种情况,我计划这样做,但在那之前,如果有人遇到同样的问题,你会有一个快速的解决方案。

相关内容

  • 没有找到相关文章