批量使用消息- RabbitMQ



我能够使用上述代码使用多个生产者使用不同路由密钥发送到同一交换机的多条消息,并能够将每条消息插入到数据库中。

但是这会消耗太多的资源,因为消息会一个接一个地插入到DB中。所以我决定使用批量插入我发现我可以设置BasicQos

在BasicQos中设置消息限制为10后,我的期望是Console.WriteLine必须写10条消息,但它不是预期的。

我的期望是从队列中消费N个消息并进行批量插入,如果成功发送ACK,否则没有ACK

这是我使用的一段代码。

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");
        channel.BasicQos(0, 10, false);
        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);
        consumer.Received += (model, ea) =>
        {
            try
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                // Insert into Database
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                Console.WriteLine(" Recevier Ack  " + ea.DeliveryTag);
            }
            catch (Exception e)
            {
                channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
                Console.WriteLine(" Recevier No Ack  " + ea.DeliveryTag);
            }
        };
        Console.ReadLine();
    }
}

BasicQos = 10意味着客户端一次只获取10条消息,但是当您使用它时,您将每次只看到一条消息。阅读这里:https://www.rabbitmq.com/consumer-prefetch.html

AMQP指定基本。Qos方法允许您限制数量使用时通道(或连接)上未确认消息的(即"预取计数")。

对于您的范围,您必须下载消息,将其放入临时列表中,然后插入到DB中。

,然后你可以使用:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);

空白basicAck ()

参数:deliveryTag -收到的标签AMQP.Basic.GetOk或AMQP.Basic.Deliver

multiple - true表示确认所有消息直到并包括所提供的交付标签;假的,只确认提供的交付标记。

final List<String> myMessagges = new ArrayList<String>();
        channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                myMessagges.add(new String(body));
                System.out.println("Received...");
                if (myMessagges.size() >= 10) {
                    System.out.println("insert into DB...");
                    channel.basicAck(envelope.getDeliveryTag(), true);
                    myMessagges.clear();
                }

            }
        });

基于批大小的消费可以使用channel.basicQos()来完成。

Channel channel = connection.createChannel();
channel.basicQos(10);

指定在不发送ACK的情况下获取消息的最大数目。

使用DefaultConsumer类并重写它的方法。

Consumer batchConsumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
 
  }
  @Override
  public void handleCancelOk(String consumerTag) {
 
  }
};

使用channel.basicConsume()消费10条消息

channel.basicConsume(QUEUE_NAME, false, batchConsumer);

当channel.basicConsume()被调用时,它将获取一批10条消息。'false'设置为禁用自动ack,并且在消耗整个批处理后只发送一次ack。

channel.basicAck(getLastMessageEnvelope().getDeliveryTag(), true);

这里'true'表示我们正在为多个消息发送ACK。

详细说明见

RabbitMQ批量消耗

最新更新