Azure Service Bus主题接收带有事件驱动架构模型的会话的消息



我正在使用Azure Service Bus主题。因为我有大消息,所以我将大信息分开,并以sessionID和拆分订单的小消息发送它们。我希望我的接收器具有事件驱动的架构。由于我必须接收所有具有相同sessionID的消息,并且必须按照正确的分配顺序进行汇总。我的代码是我的代码。但是只有第一次我从Bellow代码中获得消息。在第二个消息中,它超时。

         public class CRMESBListener : RoleEntryPoint
            {
                private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
                private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
            public override void Run()
            {
                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running");
                try
                {
                    DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner();
                    dbMessageListener.Listen();
                    runCompleteEvent.WaitOne();
                    //this.RunAsync(this.cancellationTokenSource.Token).Wait();
                }
                finally
                {
                    this.runCompleteEvent.Set();
                }
            }
            public override bool OnStart()
            {
                // Set the maximum number of concurrent connections
                ServicePointManager.DefaultConnectionLimit = 12;
                // For information on handling configuration changes
                // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.

                bool result = base.OnStart();
                Bootstrapper.Init();
                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started");
                return result;
            }
            public override void OnStop()
            {
                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping");
                this.cancellationTokenSource.Cancel();
                this.runCompleteEvent.WaitOne();
                base.OnStop();
                Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped");
            }
            private async Task RunAsync(CancellationToken cancellationToken)
            {
                // TODO: Replace the following with your own logic.
                while (!cancellationToken.IsCancellationRequested)
                {
                    Trace.TraceInformation("Working");
                    await Task.Delay(1000);
                }
            }
        }






      public class DBMessageListener
        {
            #region Member Variables
            private static DBMessageListener dbMessageListner;
            private static object lockObject = new object();
            private TopicSubscribeClientWrapper accountTopicClient;
            private NamespaceManager namespaceManager;
            private OnMessageOptions eventDrivenMessagingOptions;
            private int crmIntegrationUserID = Common.CrmCurrentUser.UserID;
            #endregion Member Variables
            #region Constructors
            private DBMessageListener()
            {
                string subscriptionName = "AllMessages";
                namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString);
                if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName))
                {
                    namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName);
                }
                accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath);
                accountTopicClient.SubscriptionName = subscriptionName;

                eventDrivenMessagingOptions = new OnMessageOptions
                {
                    AutoComplete = true
                };
                eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived;
                eventDrivenMessagingOptions.MaxConcurrentCalls = 5;
            }
            #endregion Constructors
            #region Methods
            private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message)
            {
                if (message != null)
                {
                    try
                    {
                        await ProcessDBMessage(message.GetBody<ServiceBusMessage>());
                    }
                    catch (Exception ex)
                    {
                        //log exception
                    }
                }
            }
            private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e)
            {
                if (e != null && e.Exception != null)
                {
                }
            }
            private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message)
            {
    //process message           
            }
            public static DBMessageListener GetDBMessageListner()
            {
                if (dbMessageListner == null)
                {
                    lock (lockObject)
                    {
                        if (dbMessageListner == null)
                        {
                            dbMessageListner = new DBMessageListener();
                        }
                    }
                }
                return dbMessageListner;
            }
            public void Listen()
            {
                accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions);
            }
            #endregion Methods
        }  

 public class TopicSubscribeClientWrapper : IServiceBusClientWrapper
    {
        #region Member Variables
        private readonly string _connectionString;
        private readonly string _topicName;
        private readonly TopicClient _topicClient;
        private SubscriptionClient _subscriptionClient;
        #endregion Member Variables
        #region Properties
        public string SubscriptionName { get; set; }
        #endregion Properties
        #region Constructors
        public TopicSubscribeClientWrapper(string connectionString, string topicName)
        {
            _connectionString = connectionString;
            _topicName = topicName;
            _topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName);
        }
        #endregion Constructors
        #region Event Handlers
        public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions)
        {
            _subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName);
           // _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);
            MemoryStream largeMessageStream = new MemoryStream();
            MessageSession session = _subscriptionClient.AcceptMessageSession();
            while (true)
            {
                BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));
                if (subMessage != null)
                {
                    Stream subMessageStream = subMessage.GetBody<Stream>();
                    subMessageStream.CopyTo(largeMessageStream);
                    subMessage.Complete();
                    //Console.Write(".");
                }
                else
                {
                    //Console.WriteLine("Done!");
                    break;
                }
            }
            BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);
            var message = onMessageCallback.Method.GetParameters();
            message.SetValue(largeMessage, 1);
            _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);
        }
        #endregion Event Handlers
        #region Methods
        public Task SendAsync(BrokeredMessage message)
        {
            return _topicClient.SendAsync(message);
        }
        public void Close()
        {
            if (_subscriptionClient != null)
            {
                _subscriptionClient.Close();
            }
            _topicClient.Close();
        }
        #endregion Methods
    }

我建议采取不同的路线。与其尝试创建一条消息会话来传递大消息,不如使用专门解决此问题的索赔检查模式 - 大型附件。将您的数据写入存储空间,并发送带有消息的URI。保存/还原斑点,而不是试图在块中发送有效载荷。此外,这样可以更容易地监视您的系统(一个失败的成功/失败的消息与一个或多个斑点相关联),您可以更容易。t必须使用会议或任何特殊的会话。

最新更新