在Azure服务总线中并行处理消息



问题:我有大量的邮件要发送,目前,在任何时间点,平均有10封邮件在队列中。我的代码一次处理一个队列;也就是说,接收信息,处理它,最后发送电子邮件。这将导致在用户注册服务时向他们发送电子邮件时出现相当大的延迟。

我已经开始考虑异步修改代码到process the messages in parrallel,比如说5。我正在想象编写一个方法,并使用CTP并行调用该方法,例如5次。

我有点迷失在如何实现这个。犯错误的代价非常大,因为如果出现问题,用户会感到失望。

Request:我需要帮助编写代码,并行处理Azure服务总线中的消息。谢谢。

My code in a nutshell.
Public .. Run()
{
   _myQueueClient.BeginReceive(ProcessUrgentEmails, _myQueueClient);
}
void ProcessUrgentEmails(IAsyncResult result)
{
   //casted the `result` as a QueueClient
   //Used EndReceive on an object of BrokeredMessage
   //I processed the message, then called
   sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
 }

 //This method is never called despite having it as callback function above.
 void ProcessEndComplete(IAsyncResult result)
 {
     Trace.WriteLine("ENTERED ProcessEndComplete method...");
     var bm = result.AsyncState as BrokeredMessage;
     bm.EndComplete(result); 
 }

此页面为您提供使用Windows Azure服务总线时的性能提示。

关于并行处理,您可以有一个线程池用于处理,每次获得消息时,只需获取该池中的一个并为其分配消息。你需要管理这个池。

或者,您可以一次检索多个消息并使用TPL处理它们…例如,BeginReceiveBatch/EndReceiveBatch方法允许你从Queue (Async)中检索多个"项目",然后使用"AsParallel"转换由前面方法返回的IEnumerable,并在多个线程中处理消息。

非常简单的例子:

var messages = await Task.Factory.FromAsync<IEnumerable<BrokeredMessage>>(Client.BeginReceiveBatch(3, null, null), Client.EndReceiveBatch);
messages.AsParallel().WithDegreeOfParallelism(3).ForAll(item =>
{
    ProcessMessage(item);
});

该代码从队列中检索3条消息,并以"3个线程"处理它们(注意:不能保证它将使用3个线程,.NET将分析系统资源,如果有必要,它将使用最多3个线程)

你也可以删除"WithDegreeOfParallelism"部分,这样。net就会使用它需要的任何线程。

在一天结束的时候,有很多方法可以做到这一点,你必须决定哪一个更适合你。

这是一个使用常规Begin/End异步模式的基本示例。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1
{
    public class WorkerRole : RoleEntryPoint
    {
        // The name of your queue
        const string QueueName = "QUEUE_NAME";
        const int MaxThreads = 3;
        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        QueueClient Client;
        bool IsStopped;
        int dequeueRequests = 0;
        public override void Run()
        {
            while (!IsStopped)
            {
                // Increment Request Counter
                Interlocked.Increment(ref dequeueRequests);
                Trace.WriteLine(dequeueRequests + " request(s) in progress");
                Client.BeginReceive(new TimeSpan(0, 0, 10), ProcessUrgentEmails, Client);
                // If we have made too many requests, wait for them to finish before requesting again.
                while (dequeueRequests >= MaxThreads && !IsStopped)
                {
                    System.Diagnostics.Trace.WriteLine(dequeueRequests + " requests in progress, waiting before requesting more work");
                    Thread.Sleep(2000);
                }
            }
        }

        void ProcessUrgentEmails(IAsyncResult result)
        {
            var qc = result.AsyncState as QueueClient;
            var sendEmail = qc.EndReceive(result);
            // We have received a message or has timeout... either way we decrease our counter
            Interlocked.Decrement(ref dequeueRequests);
            // If we have a message, process it
            if (sendEmail != null)
            {
                var r = new Random();
                // Process the message
                Trace.WriteLine("Processing message: " + sendEmail.MessageId);
                System.Threading.Thread.Sleep(r.Next(10000));
                // Mark it as completed
                sendEmail.BeginComplete(ProcessEndComplete, sendEmail);
            }
        }

        void ProcessEndComplete(IAsyncResult result)
        {
            var bm = result.AsyncState as BrokeredMessage;
            bm.EndComplete(result);
            Trace.WriteLine("Completed message: " + bm.MessageId);
        }

        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;
            // Create the queue if it does not exist already
            string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
            if (!namespaceManager.QueueExists(QueueName))
            {
                namespaceManager.CreateQueue(QueueName);
            }
            // Initialize the connection to Service Bus Queue
            Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
            IsStopped = false;
            return base.OnStart();
        }
        public override void OnStop()
        {
            // Waiting for all requestes to finish (or timeout) before closing
            while (dequeueRequests > 0)
            {
                System.Diagnostics.Trace.WriteLine(dequeueRequests + " request(s), waiting before stopping");
                Thread.Sleep(2000);
            }
            // Close the connection to Service Bus Queue
            IsStopped = true;
            Client.Close();
            base.OnStop();
        }
    }
}

希望能有所帮助。

最新更新