celener-worker拒绝取消对application/json的不可信内容类型的序列化



我有一个分布式设置,其中我使用RabbitMQ作为消息代理,使用celey作为分布式任务队列。RabbitMQjava客户端将json数据推送到消息代理中例如:

{"id":"95a67132-a47-4d44-80bd-7a8725528254","args":[],"task":"app.task.add","夸rgs":]]}

内容类型:application/json,内容编码:utf-8(使用自定义MessagePostProcessor实现设置)

在芹菜方面,我有一些任务。例如:

from celery import Celery
app = Celery('tasks', broker='amqp://<broker credentials>@<ip>//')
app.config_from_object('celeryconfig')
@app.task
def add():
    print "done"
@app.task
def addX(x):
    print "done addx"

和一个类似celeryconfig.py的:

from kombu import Exchange, Queue
from kombu.serialization import registry
CELERY_IGNORE_RESULT = False
BROKER_HOST = <ip address of broker> #IP address of the server running RabbitMQ and Celery
BROKER_URL='amqp://<broker credentials>@<ip>'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_IMPORTS=("tasks",)
CELERY_DEFAULT_QUEUE = <queue name>
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_QUEUES = (
   Queue('default', Exchange('exchange'), routing_key='default'),
)
registry.enable('application/json')

但是,当芹菜工作者试图解码推送到消息代理中的json对象时,它会给我一个错误:

[2016-01-12 17:47:56,602: CRITICAL/MainProcess] Can't decode message body: ContentDisallowed('Refusing to deserialize untrusted content of type application/json; charset=utf-8 (application/json; charset=utf-8)',) [type:u'application/json; charset=utf-8' encoding:u'utf-8' headers:{}]
body: u'{"id":"78094130-9e45-4b0e-9418-e249219e8e65","args":[],"task":"app.task.add","kwargs":[]}' (89b)
Traceback (most recent call last):
 File "/Users/tech/messaging/venv/lib/python2.7/site-packages/kombu/messaging.py", line 592, in _receive_callback
   decoded = None if on_m else message.decode()
 File "/Users/tech/messaging/venv/lib/python2.7/site-packages/kombu/message.py", line 142, in decode
   self.content_encoding, accept=self.accept)
 File "/Users/tech/messaging/venv/lib/python2.7/site-packages/kombu/serialization.py", line 174, in loads
   raise self._for_untrusted_content(content_type, 'untrusted')
ContentDisallowed: Refusing to deserialize untrusted content of type application/json; charset=utf-8 (application/json; charset=utf-8)

有人能帮我找出我犯的错误吗?

编辑:关于scytale 的建议

我试着放"application/data"one_answers"application/text",错误从"拒绝反序列化"改为:

[2016-01-12 20:59:34,903: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: u'{"id":"03339160-3c43-4327-99ce-a733e58b16b2","args":[],"task":"app.task.add","kwargs":[]}' (89b)
{content_type:u'application/data' content_encoding:u'utf-8'
 delivery_info:{'consumer_tag': u'None5', 'redelivered': False, 'routing_key': u'exportToExcelQueue', 'delivery_tag': 1, 'exchange': u'exchange'} headers={}}

我查看了celery.worker.sumer.py的代码,发现它在消息头中查找"task",当没有找到时抛出了"UNKNOWN_FORMAT"异常。然后,我在java客户端设置了这样的消息头:

message.getMessageProperties().setHeader("task","app.task.add");

仍然是相同的错误

我通过更改解决了问题

CELERY_ACCEPT_CONTENT=['json','应用程序/文本']

并添加

registry.enable('json')

registry.enable('application/text')

在celeryconfig.py中。最终的格式看起来像这样:

from kombu import Exchange, Queue
from kombu.serialization import registry
CELERY_IMPORTS = ('tasks')
CELERY_IGNORE_RESULT = False
BROKER_HOST = <IP address of the server running RabbitMQ and Celery>
BROKER_URL='amqp://<broker credentials>@<ip>'
CELERY_RESULT_BACKEND = 'amqp://'
CELERY_IMPORTS=("tasks",)
CELERY_DEFAULT_EXCHANGE_TYPE = 'direct'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json','application/text']
CELERY_QUEUES = (
 Queue('exportToExcelQueue', Exchange('exchange'), routing_key='default'),
)
registry.enable('json')
registry.enable('application/text')

我还将消息头改回了"application/json",并将芹菜消息的格式改为:

{
    "id": "a72d85f4-499a-4428-a5ab-29290788d826",
    "args": [],
    "task": "tasks.add"
}

检查代码-kombu/serialization.py

TRUSTED_CONTENT = frozenset(['application/data', 'application/text'])
...
class SerializerRegistry(object):
"""The registry keeps track of serialization methods."""
def loads(self, data, content_type, content_encoding,
          accept=None, force=False, _trusted_content=TRUSTED_CONTENT):
    content_type = content_type or 'application/data'
    if accept is not None:
        if content_type not in _trusted_content 
                and content_type not in accept:
            raise self._for_untrusted_content(content_type, 'untrusted')

即java客户端需要使用content-type: application/datacontent-type: application/text

相关内容

  • 没有找到相关文章