我有一个分布式设置,其中我使用RabbitMQ作为消息代理,使用celey作为分布式任务队列。RabbitMQjava客户端将json数据推送到消息代理中例如:
{"id":"95a67132-a47-4d44-80bd-7a8725528254","args":[],"task":"app.task.add","夸rgs":]]}
内容类型:application/json,内容编码:utf-8(使用自定义MessagePostProcessor实现设置)
在芹菜方面,我有一些任务。例如:
from celery import Celery
app = Celery('tasks', broker='amqp://<broker credentials>@<ip>//')
app.config_from_object('celeryconfig')
@app.task
def add():
print "done"
@app.task
def addX(x):
print "done addx"
和一个类似celeryconfig.py的:
from kombu import Exchange, Queue
from kombu.serialization import registry
CELERY_IGNORE_RESULT = False
BROKER_HOST = <ip address of broker> #IP address of the server running RabbitMQ and Celery
BROKER_URL='amqp://<broker credentials>@<ip>'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_IMPORTS=("tasks",)
CELERY_DEFAULT_QUEUE = <queue name>
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_QUEUES = (
Queue('default', Exchange('exchange'), routing_key='default'),
)
registry.enable('application/json')
但是,当芹菜工作者试图解码推送到消息代理中的json对象时,它会给我一个错误:
[2016-01-12 17:47:56,602: CRITICAL/MainProcess] Can't decode message body: ContentDisallowed('Refusing to deserialize untrusted content of type application/json; charset=utf-8 (application/json; charset=utf-8)',) [type:u'application/json; charset=utf-8' encoding:u'utf-8' headers:{}]
body: u'{"id":"78094130-9e45-4b0e-9418-e249219e8e65","args":[],"task":"app.task.add","kwargs":[]}' (89b)
Traceback (most recent call last):
File "/Users/tech/messaging/venv/lib/python2.7/site-packages/kombu/messaging.py", line 592, in _receive_callback
decoded = None if on_m else message.decode()
File "/Users/tech/messaging/venv/lib/python2.7/site-packages/kombu/message.py", line 142, in decode
self.content_encoding, accept=self.accept)
File "/Users/tech/messaging/venv/lib/python2.7/site-packages/kombu/serialization.py", line 174, in loads
raise self._for_untrusted_content(content_type, 'untrusted')
ContentDisallowed: Refusing to deserialize untrusted content of type application/json; charset=utf-8 (application/json; charset=utf-8)
有人能帮我找出我犯的错误吗?
编辑:关于scytale 的建议
我试着放"application/data"one_answers"application/text",错误从"拒绝反序列化"改为:
[2016-01-12 20:59:34,903: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: u'{"id":"03339160-3c43-4327-99ce-a733e58b16b2","args":[],"task":"app.task.add","kwargs":[]}' (89b)
{content_type:u'application/data' content_encoding:u'utf-8'
delivery_info:{'consumer_tag': u'None5', 'redelivered': False, 'routing_key': u'exportToExcelQueue', 'delivery_tag': 1, 'exchange': u'exchange'} headers={}}
我查看了celery.worker.sumer.py的代码,发现它在消息头中查找"task",当没有找到时抛出了"UNKNOWN_FORMAT"异常。然后,我在java客户端设置了这样的消息头:
message.getMessageProperties().setHeader("task","app.task.add");
仍然是相同的错误
我通过更改解决了问题
CELERY_ACCEPT_CONTENT=['json','应用程序/文本']
并添加
registry.enable('json')
registry.enable('application/text')
在celeryconfig.py中。最终的格式看起来像这样:
from kombu import Exchange, Queue
from kombu.serialization import registry
CELERY_IMPORTS = ('tasks')
CELERY_IGNORE_RESULT = False
BROKER_HOST = <IP address of the server running RabbitMQ and Celery>
BROKER_URL='amqp://<broker credentials>@<ip>'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_IMPORTS=("tasks",)
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json','application/text']
CELERY_QUEUES = (
Queue('exportToExcelQueue', Exchange('exchange'), routing_key='default'),
)
registry.enable('json')
registry.enable('application/text')
我还将消息头改回了"application/json",并将芹菜消息的格式改为:
{
"id": "a72d85f4-499a-4428-a5ab-29290788d826",
"args": [],
"task": "tasks.add"
}
检查代码-kombu/serialization.py
TRUSTED_CONTENT = frozenset(['application/data', 'application/text'])
...
class SerializerRegistry(object):
"""The registry keeps track of serialization methods."""
def loads(self, data, content_type, content_encoding,
accept=None, force=False, _trusted_content=TRUSTED_CONTENT):
content_type = content_type or 'application/data'
if accept is not None:
if content_type not in _trusted_content
and content_type not in accept:
raise self._for_untrusted_content(content_type, 'untrusted')
即java客户端需要使用content-type: application/data
或content-type: application/text