我正在尝试从Java调度Celery任务。
我像这样向 RabbitMQ 发送任务:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
boolean durable = true;
channel.queueDeclarePassive("celery");
channel.exchangeDeclarePassive("celery");
String MESSAGE_FORMAT = "{"id": "%s", " + ""task": "%s", " + ""args": ["%s"]}";
message = String.format(MESSAGE_FORMAT, UUID.randomUUID().toString(), "celery.tasks.add", "2501");
channel.basicPublish("celery", "celery", null, message.getBytes("UTF-8"));
消息发送:
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered |
+-------------+----------+---------------+-----------------------------------------------------------------------------------------------------------+---------------+------------------+------------+-------------+
| celery | | 0 | {"id": "7421864e-aff3-4f2f-b274-9d5eacfc8941", "task": "celery.tasks.add", "args": ["2501"]} | 105 | string | | False
但是当我开始工作时:
celery -A tasks worker --loglevel=info
我收到一条消息:
[2016-03-02 16:23:08,457: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"id": "76e0fc3b-3ae0-4603-9099-3cd6bb15ffa9", "task": "celery.tasks.add", "args": ["2501"]}' (105b)
{content_type:None content_encoding:None
delivery_info:{'redelivered': False, 'delivery_tag': 2, 'routing_key': 'celery', 'exchange': '', 'consumer_tag': 'None4'} headers={}}
所以任务没有运行,因为格式有些错误。我跟着:http://docs.celeryproject.org/en/latest/internals/protocol.html
该任务以如下tasks.py
声明:
from celery import Celery
app = Celery('tasks')
app.config_from_object('celeryconfig')
@app.task
def add(materialId):
f = open('logg','w')
f.write('processing task: ' + materialId)
f.close()
所以我不确定应该纠正什么。你能帮我这个吗?
如果不可能,我将回退到调用python来自Java的过程,但这会使部署过程复杂化所以我想避免它。
编辑:
为了测试我的方法,我也尝试从 python 以这种方式安排任务,但错误是相同的:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='celery', durable=True)
channel.basic_publish(exchange='celery', routing_key='celery', body='{"id": "7421864e-aff3-4f2f-b274-9d5eacfc8941", "task": "celery.tasks.add", "args": ["2501"]}')
编辑2:
这不是与 Java 中的 Django/Celery 互操作的副本由于答案不能解决我的问题:调整路由密钥和交换并没有解决问题。
您需要使用基本属性生成器将消息的内容类型指定为 JSON。
channel.basicPublish("celery", "celery", null,
new AMQP.BasicProperties().builder().contentType("application/json").build(),
message.getBytes("UTF-8"));
有关如何使用它以及Rabbit文档中的更多示例:https://www.rabbitmq.com/api-guide.html#publishing
对于内容类型标头的芹菜支持的格式:http://docs.celeryproject.org/en/latest/internals/protocol.html#serialization
您需要获取从python客户端代码发送的原始任务请求,并将其与您在Java中生成的内容进行比较。
您可以尝试使用芹菜源中examples
目录中的 HTTP 网关提交任务 - 我不确定代码的这一特定部分是如何维护的。