如何实现在几次可配置的重新排队尝试后拒绝消息的机制?
换句话说,如果我订阅一个队列,我想保证相同的消息不会重新传递超过 X 次。
我的代码示例:
q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
try{
doSomething(data);
} catch(e) {
message.reject(true);
}
}
在我看来,最好的解决方案是在应用程序中处理这些错误,并在应用程序决定无法处理消息时拒绝它们。
如果不想丢失信息,则应用应仅在将同一消息发送到错误队列后拒绝该消息。
代码未经过测试:
q.subscribe({ack: true}, function () {
var numOfRetries = 0;
var args = arguments;
var self = this;
var promise = doWork.apply(self, args);
for (var numOfRetries = 0; numOfRetries < MAX_RETRIES; numOfRetries++) {
promise = promise.fail(function () { return doWork.apply(self, args); });
}
promise.fail(function () {
sendMessageToErrorQueue.apply(self, args);
rejectMessage.apply(self, args);
})
})
一种可能的解决方案是使用您定义的某种哈希函数对消息进行哈希处理,然后检查缓存对象中的该哈希。如果存在,请将一个添加到缓存中,直到可配置的最大值,如果不存在,则将其设置为 1。下面是一个快速而肮脏的原型(请注意,mcache
对象应该在所有订阅者的范围内):
var mcache = {}, maxRetries = 3;
q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
var messagehash = hash(message);
if(mcache[messagehash] === undefined){
mcache[messagehash] = 0;
}
if(mcache[messagehash] > maxRetries) {
q.shift(true,false); //reject true, requeue false (discard message)
delete mcache[messagehash]; //don't leak memory
} else {
try{
doSomething(data);
q.shift(false); //reject false
delete mcache[messagehash]; //don't leak memory
} catch(e) {
mcache[messagehash]++;
q.shift(true,true); //reject true, requeue true
}
}
}
如果消息具有 GUID,则只需在哈希函数中返回该 GUID。