在 RabbitMQ 中的给定日期之后取消订阅特定队列



我有一个Java类,它根据GUI的某个操作启动与RabbitMQ服务器的连接(使用发布/订阅模式(并侦听新事件。

我想添加一个新功能,我将允许用户设置一个"结束时间",这将阻止我的应用程序侦听新事件(停止从队列中消费而不关闭它(。

我尝试使用 basicCancel 方法,但我找不到一种方法使其适用于预定义的日期。 在我的 Subscribe 类中启动一个新线程,该线程将在到达给定日期时调用 basicCancel,还是有更好的方法可以做到这一点?

收听新事件

private void listenToEvents(String queueName) {
try {
logger.info(" [*] Waiting for events. Subscribed to : " + queueName);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
TypeOfEvent event = null;
String message = new String(body);

// process the payload
InteractionEventManager eventManager = new InteractionEventManager();
event = eventManager.toCoreMonitorFormatObject(message);

if(event!=null){    
String latestEventOpnName = event.getType().getOperationMessage().getOperationName();
if(latestEventOpnName.equals("END_OF_PERIOD"))
event.getMessageArgs().getContext().setTimestamp(++latestEventTimeStamp);            
latestEventTimeStamp = event.getMessageArgs().getContext().getTimestamp();                                    
ndaec.receiveTypeOfEventObject(event);                  
}
}
};
channel.basicConsume(queueName, true, consumer);   
//Should I add the basicCancel here?
}
catch (Exception e) {
logger.info("The Monitor could not reach the EventBus. " +e.toString());
}     
}

启动连接

public String initiateConnection(Timestamp endTime) {
Properties props = new Properties();
try {
props.load(new FileInputStream(everestHome+ "/monitoring-system/rabbit.properties"));
}catch(IOException e){
e.printStackTrace();
}                       
RabbitConfigure config = new RabbitConfigure(props,props.getProperty("queuName").trim());
ConnectionFactory factory = new ConnectionFactory();
exchangeTopic = new HashMap<String,String>();
String exchangeMerged = config.getExchange();
logger.info("Exchange=" + exchangeMerged);
String[] couples = exchangeMerged.split(";");
for(String couple : couples)
{
String[] infos = couple.split(":");
if (infos.length == 2)
{
exchangeTopic.put(infos[0], infos[1]);
}
else
{
logger.error("Invalid Exchange Detail: " + couple);
}
}
for(Entry<String, String> entry : exchangeTopic.entrySet()) {
String exchange = entry.getKey();
String topic = entry.getValue();
factory.setHost(config.getHost());
factory.setPort(Integer.parseInt(config.getPort()));
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
try {
connection1= factory.newConnection();
channel = connection1.createChannel();
channel.exchangeDeclare(exchange, EXCHANGE_TYPE);
/*Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", endTime.getTime());*/
channel.queueDeclare(config.getQueue(),false,false,false,null);
channel.queueBind(config.getQueue(),exchange,topic);            
logger.info("Connected to RabbitMQ.n Exchange: " + exchange + " Topic: " + topic +"n Queue Name is: "+ config.getQueue());
return config.getQueue();
} catch (IOException e) {
logger.error(e.getMessage());
e.printStackTrace();
} catch (TimeoutException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
return null;
}

您可以创建一个延迟队列,设置离开时间,以便您推送到那里的消息在您想要停止使用者时立即成为死信。

然后,您必须将死信交换绑定到一个队列,该队列的使用者将在收到消息后立即停止另一个队列。

当你有 RabbitMq 时,永远不要使用线程,你可以用延迟的消息做很多有趣的事情!

最新更新