RabbitMQ消费者中的并发消息处理



我是RabbitMQ的新手,如果我的问题听起来微不足道,请原谅我。我想在RabbitMQ上发布消息,这将由RabbitMQ消费者处理。

我的消费者机器是一个多核机器(最好是azure上的工作者角色)。但是QueueBasicConsumer每次推送一条消息。我如何编程来利用所有的核,我可以同时处理多个消息。

  1. 一个解决方案是在多个线程中打开多个通道,然后在那里处理消息。但是在这种情况下,我将如何决定线程的数量。
  2. 另一种方法是在主线程上读取消息,然后创建任务并将消息传递给该任务。在这种情况下,我将不得不停止消费消息,以防有许多消息(超过阈值)已经在处理中。不知道如何实现。

Thanks In Advance

第二个选项听起来更合理——在单个通道上消费并生成多个任务来处理消息。要实现并发控制,可以使用信号量来控制运行中的任务数量。在启动任务之前,您将等待信号量变为可用,并且在任务完成后,它将发出信号量以允许其他任务运行。

你没有指定你选择的语言/技术堆栈,但无论你做什么-尝试利用线程池而不是自己创建和管理线程。在。net中,这意味着使用Task.Run来异步处理消息。

示例c#代码:

using (var semaphore = new SemaphoreSlim(MaxMessages))
{
    while (true)
    {
        var args = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
        semaphore.Wait();
        Task.Run(() => ProcessMessage(args))
            .ContinueWith(() => semaphore.Release());
    }
}
比起自己控制并发级别,你可能会发现在通道上启用显式ACK控制更容易,并使用RabbitMQ消费者预取来设置未确认消息的最大数量。这样,您将永远不会收到超过您想要的消息。

最新更新