Azure服务总线使用Azure.Messaging.ServiceBus一次读取所有消息



我正在使用Azure.Messaging.ServiceBusnuget包与Azure服务总线一起工作。我们已经创建了一个主题和一个订阅。订阅有100多条消息。我们想阅读所有的信息,并在信息到达时继续阅读。

Microsoft.Azure.ServiceBus包(现在已弃用(提供了用于处理每个传入消息的RegisterMessageHandler。我在Azure.Messaging.ServiceBusnuget包下找不到类似的选项。

我可以一次阅读一条消息,但每次都必须手动呼叫await receiver.ReceiveMessageAsync();

要接收多条消息(一批(,您应该使用ServiceBusReceiver.ReceiveMessagesAsync()(不是复数,也不是单数的'message'(。此方法将返回它可以发送回的任何数量的消息。为了确保检索到所有100多条消息,您需要循环直到没有可用的消息。

如果你想使用处理器,新的SDK中也提供了处理器。请参阅我对类似问题的回答。

根据@gaurav Mantri的建议,我使用ServiceBusProcessor类来实现基于事件的模型来处理消息

public async Task ReceiveAll()
{
string connectionString = "Endpoint=sb://sb-test-today.servicebus.windows.net/;SharedAccessKeyName=manage;SharedAccessKey=8e+6SWp3skB3Aedsadsadasdwz5DU=;";
string topicName = "topicone";
string subscriptionName = "subone";
await using var client = new ServiceBusClient(connectionString, new ServiceBusClientOptions
{
TransportType = ServiceBusTransportType.AmqpWebSockets
});
var options = new ServiceBusProcessorOptions
{
// By default or when AutoCompleteMessages is set to true, the processor will complete the message after executing the message handler
// Set AutoCompleteMessages to false to [settle messages](https://learn.microsoft.com/en-us/azure/service-bus-messaging/message-transfers-locks-settlement#peeklock) on your own.
// In both cases, if the message handler throws an exception without settling the message, the processor will abandon the message.
AutoCompleteMessages = false,
// I can also allow for multi-threading
MaxConcurrentCalls = 1
};
await using ServiceBusProcessor processor = client.CreateProcessor(topicName, subscriptionName, options);
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
Console.ReadKey();
}

public async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
Console.WriteLine(body);
// we can evaluate application logic and use that to determine how to settle the message.
await args.CompleteMessageAsync(args.Message);
}
public Task ErrorHandler(ProcessErrorEventArgs args)
{
// the error source tells me at what point in the processing an error occurred
Console.WriteLine(args.ErrorSource);
// the fully qualified namespace is available
Console.WriteLine(args.FullyQualifiedNamespace);
// as well as the entity path
Console.WriteLine(args.EntityPath);
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}

最新更新