我正在写一段代码,它将允许我们:
- 查看Azure服务总线主题(Peek)中存在的所有死信消息的列表
- 修复并将其发送回主题
- 重新发送时将它们从死信队列中删除
我对前两点没有意见;使用Peek接收模式,我可以显示消息列表,我们可以毫无问题地编辑和重新发送。
当我想从死信队列中删除消息时,问题就来了。
我们如何在逐个消息的级别上做到这一点?我们可能只想删除死信队列中的2条消息,并保留其他消息以供稍后审查。对死信队列中的消息调用.Complete()
是否会像在主订阅中那样删除它?
供参考;这是我们为死信队列获取SubscriptionClient
的代码:
private SubscriptionClient GetOrCreateSubscriptionClient(string connectionString)
{
if (!NamespaceManager.TopicExists(_topicName))
{
NamespaceManager.CreateTopic(new TopicDescription(_topicName)
{
MaxSizeInMegabytes = 5120,
DefaultMessageTimeToLive = TimeSpan.FromSeconds(DEFAULT_LOCK_DURATION_IN_SECONDS)
});
}
if (!NamespaceManager.SubscriptionExists(_topicName, _subscriptionName))
{
NamespaceManager.CreateSubscription(_topicName, _subscriptionName);
}
var deadLetterPath = SubscriptionClient.FormatDeadLetterPath(_topicName, _subscriptionName);
var client = SubscriptionClient.CreateFromConnectionString(
connectionString, deadLetterPath, _subscriptionName, ReceiveMode.PeekLock);
return client;
}
是的,对从死信队列中收到的代理消息的引用调用complete会将其从死信排队中删除。
以下是如何获得死信队列中所有消息的列表:
public async Task<IEnumerable<BrokeredMessage>> GetDeadLetterMessagesAsync(string connectionString,
string queueName)
{
var queue = QueueClient.CreateFromConnectionString(connectionString, QueueClient.FormatDeadLetterPath(queueName));
var messageList = new List<BrokeredMessage>();
BrokeredMessage message;
do
{
message = await queue.PeekAsync();
if (message != null)
{
messageList.Add(message);
}
} while (message != null);
return messageList;
}
使用SequenceNumber,您可以删除特定的消息:
public async Task DeleteDeadLetterMessageAsync(string connectionString, string queueName, long sequenceNumber)
{
var deadLetterQueue = QueueClient.CreateFromConnectionString(connectionString, QueueClient.FormatDeadLetterPath(queueName));
var msg = await deadLetterQueue.PeekAsync(sequenceNumber);
if (msg.SequenceNumber == sequenceNumber)
{
msg = await deadLetterQueue.ReceiveAsync();
await msg.CompleteAsync();
}
}
请注意,如果没有任何具有指定SequenceNumber的消息,Peek可能会返回另一条消息。因此,您需要检查SequenceNumber,这样就不会删除错误的消息。
使用azure.messaging.servicebus
:中的新ServiceBusClient
从死信队列中删除消息的更新代码
public async Task DeleteDeadLetterMessageAsync(string connectionString, string queueName, long sequenceNumber)
{
var serviceBusClient = new ServiceBusClient(connectionString);
var receiverOptions = new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter };
var receiver = serviceBusClient.CreateReceiver(queueName, receiverOptions);
var msg = await receiver.PeekMessageAsync(sequenceNumber);
if (msg.SequenceNumber == sequenceNumber)
{
msg = await receiver.ReceiveMessageAsync();
await receiver.CompleteMessageAsync(msg);
}
}