Azure ServiceBus AbandonMessageAsync在不一致的时间释放消息 &g



我需要检查死信队列,如果存在某些条件(如超过30天),我想将其存档到某些数据存储(而不仅仅是删除它)。我要抓取消息,如果它满足这个条件,保存到某个存储并完成/删除消息,如果不满足,放弃它。我有一个控制台应用程序,我从dlq抓取消息,它似乎可以工作,但如果我一遍又一遍地运行它,我看到返回的消息数量不一致的结果。在几次迭代中(在我的示例中是7次),它将拥有所有这些数据,但随后它将开始只获得6,0或1,并最终返回到dql中的全部数量(例如30秒后,我认为这是peek锁的默认锁定周期)。我假设每次我运行这个,我应该得到所有的消息,因为我放弃了之前运行的消息。

我正在使用Azure.Messaging.ServiceBus 7.8.1,似乎你只是将消息对象传递给放弃方法。如果有人有任何建议,那就太好了!

github代码:https://github.com/ndn2323/bustest

using Azure.Messaging.ServiceBus;
using System.Text;
namespace BusReceiver
{
public class TaskRunner
{
public TaskRunner() { }
public async Task Run() {            
const string DLQPATH = "/$deadletterqueue";
var maxMsgCount = 50;
var connectionString = "[ConnectionString]";
var topicName = "testtopic1";
var subscriberName = "testsub1";
var subscriberDlqName = subscriberName + DLQPATH;
var client = new ServiceBusClient(connectionString);
var options = new ServiceBusReceiverOptions();
options.ReceiveMode = ServiceBusReceiveMode.PeekLock;
var receiver = client.CreateReceiver(topicName, subscriberName, options);
var receiverDlq = client.CreateReceiver(topicName, subscriberDlqName, options);
Log("Starting receive from regular queue");
var msgList = await receiver.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));
Log(msgList.Count.ToString() + " messages found");
foreach (var msg in msgList)
{
await receiver.DeadLetterMessageAsync(msg);
}
Log("Starting receive from dead letter queue");
var msgListDlq = await receiverDlq.ReceiveMessagesAsync(maxMsgCount, TimeSpan.FromMilliseconds(500));            
Log(msgListDlq.Count.ToString() + " messages found in dlq");
foreach (var msg in msgListDlq) {
Log("MessageId: " + msg.MessageId + " Body: " + Encoding.ASCII.GetString(msg.Body));
// if some condition, archieve message to some data store, else abandon it to be picked up again
// for this test I'm abandoning all messages                
await receiverDlq.AbandonMessageAsync(msg);
}
await receiver.CloseAsync();
await receiverDlq.CloseAsync();
}
private void Log(string msg) {
Console.WriteLine(DateTime.Now.ToString() + ": " + msg);
}
}
}

输出示例:

C:GitHubndn2323bustestBusReceiverbinDebugnet6.0>BusReceiver.exe
5/29/2022 11:45:36 PM: Starting receive from regular queue
5/29/2022 11:45:37 PM: 0 messages found
5/29/2022 11:45:37 PM: Starting receive from dead letter queue
5/29/2022 11:45:37 PM: 7 messages found in dlq
5/29/2022 11:45:37 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:37 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:GitHubndn2323bustestBusReceiverbinDebugnet6.0>BusReceiver.exe
5/29/2022 11:45:42 PM: Starting receive from regular queue
5/29/2022 11:45:43 PM: 0 messages found
5/29/2022 11:45:43 PM: Starting receive from dead letter queue
5/29/2022 11:45:43 PM: 7 messages found in dlq
5/29/2022 11:45:43 PM: MessageId: 9e9f390655af44a8b93866920a6de77c Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 3aacffe40ab5473fb34412684bcd1907 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: a47f83d4a12845088ade427e084d8e39 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 47ff6dd4f4134661a3616a9210670be5 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d10b3602f57047f1bf613675e35793e0 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: 08a45405375e46ffb99db9812c3e3d78 Body: TestMessage
5/29/2022 11:45:43 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:GitHubndn2323bustestBusReceiverbinDebugnet6.0>BusReceiver.exe
5/29/2022 11:45:48 PM: Starting receive from regular queue
5/29/2022 11:45:49 PM: 0 messages found
5/29/2022 11:45:49 PM: Starting receive from dead letter queue
5/29/2022 11:45:49 PM: 1 messages found in dlq
5/29/2022 11:45:49 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage
C:GitHubndn2323bustestBusReceiverbinDebugnet6.0>BusReceiver.exe
5/29/2022 11:46:03 PM: Starting receive from regular queue
5/29/2022 11:46:04 PM: 0 messages found
5/29/2022 11:46:04 PM: Starting receive from dead letter queue
5/29/2022 11:46:04 PM: 1 messages found in dlq
5/29/2022 11:46:04 PM: MessageId: d21cff4ae5b6453f9077b3805ace4e09 Body: TestMessage

由于网络、服务和应用程序的变化,在调用ReceiveMessagesAsync时看到批量大小不一致的返回是正常的。

接收时,没有最小批量。接收方将向链接添加足够的信用,以允许maxMessageCount从服务流出,但不会等待尝试构建该规模的批处理。一旦从服务传输了任何消息,它们将作为批处理返回。由于您指定了maxWaitTime,如果在此时间内服务上没有可用的消息,则将返回空批。

ServiceBusReceiverOptions中设置PrefetchCount可以帮助平滑批大小。也就是说,一定要注意,锁是为预取队列中的消息保留的,不会自动更新,因此如果预取计数过高,将导致看到过期的锁。

在您的示例中,最好的方法可能是重复执行接收循环,直到连续看到1个(或更多?)空批。这将是一个强有力的信号,表明队列是空的。

最新更新