RabbitMQ 中的事务性消费



我是RabbitMQ的新手。我需要 MOM 系统来实现以下目的:

  1. 已发布的消息将被消耗,直到我的逻辑执行 成功。
  2. 代理不必从 排队,直到我的逻辑成功执行。

为了实现这些目标,我在第一次尝试时编写了以下代码:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException
{
try
{
channel.txSelect();
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recieve Message is :" + new String(body));
int reslt = //execute my logic   
if(result == 0)
channel.txCommit();
else
channel.txRollback();
}
catch(Throwable t)
{
t.printStackTrace();
}
}
});

通过这种方法,我实现了第二个目的,换句话说,代理不会删除我的消息,但一次队列中的所有消息都被消耗并且所有消息都回滚了,代理不会再次向我的消费者发送消息。

在第二次尝试时,我编写了以下代码:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, "ProcessorOneTag",
new DefaultConsumer(channel)
{
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException
{
try
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
System.out.println("Recieve Message is :" + new String(body));
int reslt = //execute my logic   
if(result == 0)
channel.basicAck(deliveryTag, false);
else
channel.basicNack(deliveryTag,false,true);
}
catch(Throwable t)
{
t.printStackTrace();
}
}
});

通过这个解决方案,我实现了这两个目标,但我不知道我的代码是否正确?该方法是否会导致高 TPS 生产环境中出现问题?我不知道basicNack方法的重新排队标志是重还是轻?

几个月前我也有同样的要求。我经历了许多解决方案,这就是对我有用的解决方案。

我将传递标签存储在内存中的某个地方,如果我的逻辑运行良好,我会手动确认消息或拒绝该消息。为此,我使用了以下方法。

if (success)
connectionModel.BasicAck(Convert.ToUInt64(uTag), false);
else
connectionModel.BasicReject(Convert.ToUInt64(uTag), true);

上面的代码在我的生产中以 5000msg/秒的速度运行良好。好吧,基本的Nack方法给我带来了问题。

相关内容

  • 没有找到相关文章

最新更新