bramqp & node.js:取消消费者时出错(basic.cancel)



首先,我是rabbitmq和bramqp的新手。我知道这可能是一个愚蠢的问题,但由于从队列中取消消费者的这个问题,我一直在拔头发。我搜索了整个互联网,没有找到如何执行basic.cancel代码的bramqp示例。

这是我的代码:

        var bramqp = require('bramqp');
        var net = require('net');
        var async = require('async');
        var queueName = 'testQueue';
        var consumerTag = 'testConsumer';
        var exchangeName = 'testExchange';
        var socket = net.connect({
            port : 5672
        });
        bramqp.initialize(socket, 'rabbitmq/full/amqp0-9-1.stripped.extended', function(error, handle){
            async.series([ function(seriesCallback) {
                handle.openAMQPCommunication('guest', 'guest', true, seriesCallback);
            }, function(seriesCallback) {
                handle.exchange.declare(
                    1 /*short reserved-1*/,
                    exchangeName /*exchange-name exchange*/,
                    'fanout' /*shortstr type*/,
                    false /*bit passive*/,
                    true /*bit durable*/,
                    false /*bit auto-delete*/,
                    false /*bit internal*/,
                    false /*no-wait no-wait*/,
                    {} /*table arguments*/
                );
                handle.once('exchange.declare-ok', function(channel, method, data) {
                    console.log('exchange declared');
                    seriesCallback();
                });
            }, function(seriesCallback) {
                handle.basic.qos(
                    1 /*long prefetch-size*/,
                    0 /*short prefetch-count*/,
                    1 /*...*/,
                    false /*bit global*/
                );
                handle.once('basic.qos-ok', function(channel, method, data) {
                    console.log('qos accepted');
                    seriesCallback();
                });
            }, function(seriesCallback) {
                handle.queue.declare(
                    1 /*short reserved-1*/,
                    queueName /*queue-name queue*/,
                    false /*bit passive*/,
                    true /*bit durable*/,
                    false /*bit exclusive*/,
                    false /*bit auto-delete*/,
                    false /*no-wait no-wait*/,
                    {} /*table arguments*/
                );
                handle.once('queue.declare-ok', function(channel, method, data) {
                    console.log('queue declared');
                    seriesCallback();
                });
            }, function(seriesCallback) {
                handle.queue.bind(
                    1 /*short reserved-1*/,
                    queueName /*exchange-name destination*/,
                    exchangeName /*exchange-name source*/,
                    null /*shortstr routing-key*/,
                    false /*no-wait no-wait*/,
                    {} /*table arguments*/
                );
                handle.once('queue.bind-ok', function(channel, method, data) {
                    console.log('queue bound sucessfully');
                    seriesCallback();
                });
            }, function(seriesCallback) {
                handle.basic.consume(1 /*short reserved-1*/,
                    queueName /*queue-name queue*/,
                    consumerTag,
                    false /*no-local no-local*/,
                    false /*no-ack no-ack*/,
                    false /*bit exclusive*/,
                    false /*no-wait no-wait*/,
                    {} /*table arguments*/
                );
                handle.once('basic.consume-ok', function (channel, method, data) {
                    console.log('consuming from queue');
                    console.log(data);
                    handle.on('basic.deliver', function (channel, method, data) {
                        console.log('incoming message');
                        console.log(data);
                        handle.once('content', function (channel, className, properties, content) {
                            console.log('got a message:');
                            console.log(content.toString());
                            if (content.toString().indexOf("END_MESSAGE") > -1){
                                handle.basic.cancel(consumerTag, false);
                                handle.once('basic.cancel-ok', function(channel, method, data) {
                                    console.log("consumer cancelled successfully");
                                    seriesCallback();
                                });
                                res.json("END_MESSAGE");
                            }
                            else{
                                console.log('acking');
                                handle.basic.ack(1, data['delivery-tag']);
                            }
                            console.log('with properties:');
                            console.log(properties);
                            seriesCallback();
                        });
                    });
                });
            } ], function() {
                console.log('all done');
            });
        });

因此,我试图做的只是在消费者检测到"END_MESSAGE"类型的消息时简单地停止它。我从上面的代码中得到的是以下错误:

    events.js:72
    throw er; // Unhandled 'error' event
          ^
    TypeError: value is out of bounds
        at TypeError (<anonymous>)
        at checkInt (buffer.js:705:11)
        at Buffer.writeUInt16BE (buffer.js:730:5)
    req.on('error', function(e){
        console.log('problem with request: ' + e.message);

任何意见和建议将不胜感激。谢谢!

看起来 basic.cancel() 的工作方式有点不同,至少在今天是这样。我在文档中发现这个方法实际上吃了一个回调,就像这样:

handle.basic.cancel(consumerTag, function(err) {
  if (err) throw err;
});

https://www.npmjs.com/package/bramqp-wrapper - 搜索基本取消

重要!回调不保证您不会再从队列中收到任何消息。它似乎只是为了接受取消而触发的,而不是在队列服务器实际消化它时触发的。

最新更新