我希望使用Celery/RabbitMQ在Windows 7和Python 3.3上的Flask应用程序中运行规则间隔任务。Pip安装:
billiard==3.3.0.10
celery==3.1.6
代码:
### celeryapp.py ###
from celery import Celery
from twend import config, app
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
celery_pipe = make_celery(app)
celery_pipe.config_from_object(config)
### tasks.py ###
from celery import task
from twend.celeryapp import celery_pipe as celery
@celery.task()
def add_together(a, b):
return a + b
### config.py ###
CELERY_BROKER_URL = 'amqp://'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_ACCEPT_CONTENT = ['json']
当我尝试启动工人时,我得到:
ERROR/MainProcess] Unrecoverable error: PicklingError(
"Can't pickle <class 'module'>: attribute lookup builtins.module failed",)
Traceback (most recent call last):
File "c:Python33twendlibsite-packagesceleryworker__init__.py", line 212
, in start
self.blueprint.start(self)
File "c:Python33twendlibsite-packagescelerybootsteps.py", line 123, in s
tart
step.start(parent)
File "c:Python33twendlibsite-packagescelerybootsteps.py", line 373, in s
tart
return self.obj.start()
File "c:Python33twendlibsite-packagesceleryconcurrencybase.py", line 12
7, in start
self.on_start()
File "c:Python33twendlibsite-packagesceleryconcurrencyprefork.py", line
112, in on_start
**self.options)
File "c:Python33twendlibsite-packagesbilliardpool.py", line 966, in __in
it__
self._create_worker_process(i)
File "c:Python33twendlibsite-packagesbilliardpool.py", line 1059, in _cr
eate_worker_process
w.start()
File "c:Python33twendlibsite-packagesbilliardprocess.py", line 139, in s
tart
self._popen = Popen(self)
File "c:Python33twendlibsite-packagesbilliardforking.py", line 263, in _
_init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "c:Python33twendlibsite-packagesbilliard_reduction3.py", line 60, i
n dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class 'module'>: attribute lookup builtins.
module failed
(twend) c:Python33twend>Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:Python33twendlibsite-packagesbilliardforking.py", line 457, in m
ain
self = load(from_parent)
EOFError
这个https://github.com/celery/django-celery/issues/228这似乎是一个类似的问题,但在我使用的Celery版本中得到了修复。
我也读到Windows对腌制有限制,但我不确定实际腌制的是什么。
Celery使用pickle来序列化任务和结果。我看到您已经将配置设置为只接受json序列化,所以也许您可以通过添加更多的配置行来解决问题:
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
这样一来,pickle就完全不用了,Celery的所有序列化都将使用json完成。理论上,这可能会导致性能下降,但我在Flask应用程序中以这种方式解决了类似的问题,没有注意到任何性能问题。YMMV