从 Java 调度 Celery 任务:"Received and deleted unknown message. Wrong destination"



我正在尝试从Java调度Celery任务。

我像这样向 RabbitMQ 发送任务:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
boolean durable = true;
channel.queueDeclarePassive("celery");
channel.exchangeDeclarePassive("celery");
String MESSAGE_FORMAT = "{"id": "%s", " + ""task": "%s", " + ""args": ["%s"]}";
message = String.format(MESSAGE_FORMAT, UUID.randomUUID().toString(), "celery.tasks.add", "2501");
channel.basicPublish("celery", "celery", null, message.getBytes("UTF-8"));

消息发送:

| routing_key | exchange | message_count |                                                  payload                                                  | payload_bytes | payload_encoding | properties | redelivered |
+-------------+----------+---------------+-----------------------------------------------------------------------------------------------------------+---------------+------------------+------------+-------------+
| celery      |          | 0             | {"id": "7421864e-aff3-4f2f-b274-9d5eacfc8941", "task": "celery.tasks.add", "args": ["2501"]} | 105           | string           |            | False 

但是当我开始工作时:

celery -A tasks worker --loglevel=info

我收到一条消息:

[2016-03-02 16:23:08,457: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"id": "76e0fc3b-3ae0-4603-9099-3cd6bb15ffa9", "task": "celery.tasks.add", "args": ["2501"]}' (105b)
{content_type:None content_encoding:None
  delivery_info:{'redelivered': False, 'delivery_tag': 2, 'routing_key': 'celery', 'exchange': '', 'consumer_tag': 'None4'} headers={}}

所以任务没有运行,因为格式有些错误。我跟着:http://docs.celeryproject.org/en/latest/internals/protocol.html

该任务以如下tasks.py声明:

from celery import Celery
app = Celery('tasks')
app.config_from_object('celeryconfig')

@app.task
def add(materialId):
    f = open('logg','w')
    f.write('processing task: ' + materialId)
    f.close()

所以我不确定应该纠正什么。你能帮我这个吗?

如果不可能,我将回退到调用python来自Java的过程,但这会使部署过程复杂化所以我想避免它。

编辑:

为了测试我的方法,我也尝试从 python 以这种方式安排任务,但错误是相同的:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='celery', durable=True)
channel.basic_publish(exchange='celery', routing_key='celery', body='{"id": "7421864e-aff3-4f2f-b274-9d5eacfc8941", "task": "celery.tasks.add", "args": ["2501"]}')

编辑2

这不是与 Java 中的 Django/Celery 互操作的副本由于答案不能解决我的问题:调整路由密钥和交换并没有解决问题。

您需要使用基本属性生成器将消息的内容类型指定为 JSON。

channel.basicPublish("celery", "celery", null, 
    new AMQP.BasicProperties().builder().contentType("application/json").build(),
    message.getBytes("UTF-8"));

有关如何使用它以及Rabbit文档中的更多示例:https://www.rabbitmq.com/api-guide.html#publishing

对于内容类型标头的芹菜支持的格式:http://docs.celeryproject.org/en/latest/internals/protocol.html#serialization

您需要获取从python客户端代码发送的原始任务请求,并将其与您在Java中生成的内容进行比较。

或者,

您可以尝试使用芹菜源中examples目录中的 HTTP 网关提交任务 - 我不确定代码的这一特定部分是如何维护的。

相关内容

最新更新