我正在使用php将消息发送给rabbitmq和一个python消费者来处理它。以下是我所做的。
这部分向rabbitmq.发送一个json。
$data = array(
'id' => 123,
'url' => 'baidu.com',
);
$msg = new AMQPMessage(json_encode($data));
$channel->basic_publish($msg, $exchange);
这部分接收消息并进行处理(使用芹菜)。
@app.task
def mytask(json_obj):
print(json_obj)
data = json.loads(json_obj)
thread_id = data['id']
url = data['url']
return py_read(thread_id, url)
以下是我从控制台得到的:
[2014-09-29 15:51:34,564: WARNING/MainProcess] celery@mickey-Thurley ready.
[2014-09-29 15:51:37,395: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"id":123,"url":"baidu.com"}' (28b)
{content_type:None content_encoding:None
delivery_info:{'redelivered': False, 'routing_key': '', 'exchange': 'celery', 'delivery_tag': 1, 'consumer_tag': '4'} headers={}}
我确信消费者收到了消息,但为什么没有处理消息?我该怎么办?
Celery任务不仅仅是数据。你还需要有一些东西来告诉员工你实际调用的任务,而这是你的消息中缺失的。
与其尝试自己实现,您可能应该使用现有的Celery PHP实现之一,比如这一个。