我有一个.NETCORE项目,该项目发送和收回往返Azure Service Bus的消息。
接收一条消息以下效果很好:
public static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
private static async Task MainAsync()
{
queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
ReceiveMessages();
Console.WriteLine("Press any key to stop receiving messages.");
Console.ReadKey();
// Close the client after the ReceiveMessages method has exited.
await queueClient.CloseAsync();
}
// Receives messages from the queue in a loop
private static void ReceiveMessages()
{
try
{
// Register a OnMessage callback
queueClient.RegisterMessageHandler(
async (message, token) =>
{
// Process the message
Console.WriteLine($"Received message: SequenceNumber:{message.SequenceNumber} Body:{message.GetBody<string>()}");
// Complete the message so that it is not received again.
// This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
await queueClient.CompleteAsync(message.LockToken);
},
new RegisterHandlerOptions() {MaxConcurrentCalls = 1, AutoComplete = false});
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
}
我正在尝试将此控制台应用程序移植到WebAPI项目。想法是何时调用API,它将返回队列中的当前消息。
我这样做的方式如下:
[HttpGet]
public async Task<IActionResult> Get()
{
queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
MainAsync().GetAwaiter().GetResult();
// Close the client after the ReceiveMessages method has exited.
await queueClient.CloseAsync();
return Ok(nameList);
}
private static async Task MainAsync()
{
queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
ReceiveMessages();
//Console.WriteLine("Press any key to stop receiving messages.");
//Console.ReadKey();
//// Close the client after the ReceiveMessages method has exited.
//await queueClient.CloseAsync();
}
// Receives messages from the queue in a loop
private static void ReceiveMessages()
{
try
{
// Register a OnMessage callback
queueClient.RegisterMessageHandler(
async (message, token) =>
{
// Process the message
nameList.Add(
$"Received message: SequenceNumber:{message.SequenceNumber} Body:{message.GetBody<string>()}");
//reList.Add(message.GetBody<string>());
// Complete the message so that it is not received again.
// This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
await queueClient.CompleteAsync(message.LockToken);
},
new RegisterHandlerOptions() {MaxConcurrentCalls = 1, AutoComplete = false});
}
catch (Exception exception)
{
Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
}
}
这最终以queueclient.closeasync();甚至从队列检索消息之前就被打来电话。
我尝试了各种方法来实现这一目标,但是看起来我的异步编程根本不起作用。
对此的任何帮助都非常感谢。
您正在注册处理程序,然后返回。可以在队列进来时使用" pub/sub"返回对客户端的响应,但是您需要将传入的消息推向客户端。(例如,使用Signalr)。请求。您可以从队列接收消息(带有接收方法的循环中),然后您将返回响应。
while (true)
{
try
{
//receive messages from Queue
message = queueClient.Receive(TimeSpan.FromSeconds(5));
if (message != null)
{
Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>()));
// Further custom message processing could go here…
message.Complete();
}
else
{
//no more messages in the queue
break;
}
}
catch (MessagingException e)
{
if (!e.IsTransient)
{
Console.WriteLine(e.Message);
throw;
}
else
{
HandleTransientErrors(e);
}
}
}
queueClient.Close();
}