Azure Message Bus不同步地检索数据并通过控制器操作将其返回



我有一个.NETCORE项目,该项目发送和收回往返Azure Service Bus的消息。

接收一条消息以下效果很好:

public static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }
        private static async Task MainAsync()
        {
            queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
            ReceiveMessages();
            Console.WriteLine("Press any key to stop receiving messages.");
            Console.ReadKey();
            // Close the client after the ReceiveMessages method has exited.
            await queueClient.CloseAsync();
        }
        // Receives messages from the queue in a loop
        private static void ReceiveMessages()
        {
            try
            {
                // Register a OnMessage callback
                queueClient.RegisterMessageHandler(
                    async (message, token) =>
                    {
                        // Process the message
                        Console.WriteLine($"Received message: SequenceNumber:{message.SequenceNumber} Body:{message.GetBody<string>()}");
                        // Complete the message so that it is not received again.
                        // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
                        await queueClient.CompleteAsync(message.LockToken);
                    },
                    new RegisterHandlerOptions() {MaxConcurrentCalls = 1, AutoComplete = false});
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }
        }

我正在尝试将此控制台应用程序移植到WebAPI项目。想法是何时调用API,它将返回队列中的当前消息。

我这样做的方式如下:

[HttpGet]
        public async Task<IActionResult> Get()
        {
            queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
            MainAsync().GetAwaiter().GetResult();
            // Close the client after the ReceiveMessages method has exited.
            await queueClient.CloseAsync();
            return Ok(nameList);
        }
        private static async Task MainAsync()
        {
            queueClient = new QueueClient(ServiceBusConnectionString, QueueName, ReceiveMode.PeekLock);
            ReceiveMessages();
            //Console.WriteLine("Press any key to stop receiving messages.");
            //Console.ReadKey();
            //// Close the client after the ReceiveMessages method has exited.
            //await queueClient.CloseAsync();
        }
        // Receives messages from the queue in a loop
        private static void ReceiveMessages()
        {
            try
            {
                // Register a OnMessage callback
                queueClient.RegisterMessageHandler(
                    async (message, token) =>
                    {
                        // Process the message
                        nameList.Add(
                            $"Received message: SequenceNumber:{message.SequenceNumber} Body:{message.GetBody<string>()}");
                        //reList.Add(message.GetBody<string>());
                        // Complete the message so that it is not received again.
                        // This can be done only if the queueClient is opened in ReceiveMode.PeekLock mode.
                        await queueClient.CompleteAsync(message.LockToken);
                    },
                    new RegisterHandlerOptions() {MaxConcurrentCalls = 1, AutoComplete = false});
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} > Exception: {exception.Message}");
            }
        }

这最终以queueclient.closeasync();甚至从队列检索消息之前就被打来电话。

我尝试了各种方法来实现这一目标,但是看起来我的异步编程根本不起作用。

对此的任何帮助都非常感谢。

您正在注册处理程序,然后返回。可以在队列进来时使用" pub/sub"返回对客户端的响应,但是您需要将传入的消息推向客户端。(例如,使用Signalr)。请求。您可以从队列接收消息(带有接收方法的循环中),然后您将返回响应。

   while (true) 
    { 
        try 
        { 
            //receive messages from Queue 
            message = queueClient.Receive(TimeSpan.FromSeconds(5)); 
            if (message != null) 
            { 
                Console.WriteLine(string.Format("Message received: Id = {0}, Body = {1}", message.MessageId, message.GetBody<string>())); 
                // Further custom message processing could go here… 
                message.Complete(); 
            } 
            else 
            { 
                //no more messages in the queue 
                break; 
            } 
        } 
        catch (MessagingException e) 
        { 
            if (!e.IsTransient) 
            { 
                Console.WriteLine(e.Message); 
                throw; 
            } 
            else 
            { 
                HandleTransientErrors(e); 
            } 
        } 
    } 
    queueClient.Close(); 
}

最新更新