我正在使用Azure Service Bus主题。因为我有大消息,所以我将大信息分开,并以sessionID和拆分订单的小消息发送它们。我希望我的接收器具有事件驱动的架构。由于我必须接收所有具有相同sessionID的消息,并且必须按照正确的分配顺序进行汇总。我的代码是我的代码。但是只有第一次我从Bellow代码中获得消息。在第二个消息中,它超时。
public class CRMESBListener : RoleEntryPoint
{
private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false);
public override void Run()
{
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is running");
try
{
DBMessageListener dbMessageListener = DBMessageListener.GetDBMessageListner();
dbMessageListener.Listen();
runCompleteEvent.WaitOne();
//this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// For information on handling configuration changes
// see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357.
bool result = base.OnStart();
Bootstrapper.Init();
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has been started");
return result;
}
public override void OnStop()
{
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole is stopping");
this.cancellationTokenSource.Cancel();
this.runCompleteEvent.WaitOne();
base.OnStop();
Trace.TraceInformation("NPRO.Domain.CRMESBListener.WorkerRole has stopped");
}
private async Task RunAsync(CancellationToken cancellationToken)
{
// TODO: Replace the following with your own logic.
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
await Task.Delay(1000);
}
}
}
public class DBMessageListener
{
#region Member Variables
private static DBMessageListener dbMessageListner;
private static object lockObject = new object();
private TopicSubscribeClientWrapper accountTopicClient;
private NamespaceManager namespaceManager;
private OnMessageOptions eventDrivenMessagingOptions;
private int crmIntegrationUserID = Common.CrmCurrentUser.UserID;
#endregion Member Variables
#region Constructors
private DBMessageListener()
{
string subscriptionName = "AllMessages";
namespaceManager = new NamespaceManager(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString);
if (!namespaceManager.SubscriptionExists(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName))
{
namespaceManager.CreateSubscription(ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath, subscriptionName);
}
accountTopicClient = new TopicSubscribeClientWrapper(ConfigurationSettings.ServiceBus_Pub_Sub_ConnectionString, ConfigurationSettings.ServiceBus_Pub_Sub_Internal_Account_ServicePath);
accountTopicClient.SubscriptionName = subscriptionName;
eventDrivenMessagingOptions = new OnMessageOptions
{
AutoComplete = true
};
eventDrivenMessagingOptions.ExceptionReceived += OnExceptionReceived;
eventDrivenMessagingOptions.MaxConcurrentCalls = 5;
}
#endregion Constructors
#region Methods
private async System.Threading.Tasks.Task OnMessageArrived(BrokeredMessage message)
{
if (message != null)
{
try
{
await ProcessDBMessage(message.GetBody<ServiceBusMessage>());
}
catch (Exception ex)
{
//log exception
}
}
}
private void OnExceptionReceived(object sender, ExceptionReceivedEventArgs e)
{
if (e != null && e.Exception != null)
{
}
}
private async System.Threading.Tasks.Task ProcessDBMessage(ServiceBusMessage message)
{
//process message
}
public static DBMessageListener GetDBMessageListner()
{
if (dbMessageListner == null)
{
lock (lockObject)
{
if (dbMessageListner == null)
{
dbMessageListner = new DBMessageListener();
}
}
}
return dbMessageListner;
}
public void Listen()
{
accountTopicClient.OnMessageAsync(async message => await OnMessageArrived(message), eventDrivenMessagingOptions);
}
#endregion Methods
}
public class TopicSubscribeClientWrapper : IServiceBusClientWrapper
{
#region Member Variables
private readonly string _connectionString;
private readonly string _topicName;
private readonly TopicClient _topicClient;
private SubscriptionClient _subscriptionClient;
#endregion Member Variables
#region Properties
public string SubscriptionName { get; set; }
#endregion Properties
#region Constructors
public TopicSubscribeClientWrapper(string connectionString, string topicName)
{
_connectionString = connectionString;
_topicName = topicName;
_topicClient = TopicClient.CreateFromConnectionString(connectionString, topicName);
}
#endregion Constructors
#region Event Handlers
public void OnMessageAsync(Func<BrokeredMessage, Task> onMessageCallback, OnMessageOptions onMessageOptions)
{
_subscriptionClient = SubscriptionClient.CreateFromConnectionString(_connectionString, _topicName, SubscriptionName);
// _subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);
MemoryStream largeMessageStream = new MemoryStream();
MessageSession session = _subscriptionClient.AcceptMessageSession();
while (true)
{
BrokeredMessage subMessage = session.Receive(TimeSpan.FromSeconds(5));
if (subMessage != null)
{
Stream subMessageStream = subMessage.GetBody<Stream>();
subMessageStream.CopyTo(largeMessageStream);
subMessage.Complete();
//Console.Write(".");
}
else
{
//Console.WriteLine("Done!");
break;
}
}
BrokeredMessage largeMessage = new BrokeredMessage(largeMessageStream, true);
var message = onMessageCallback.Method.GetParameters();
message.SetValue(largeMessage, 1);
_subscriptionClient.OnMessageAsync(onMessageCallback, onMessageOptions);
}
#endregion Event Handlers
#region Methods
public Task SendAsync(BrokeredMessage message)
{
return _topicClient.SendAsync(message);
}
public void Close()
{
if (_subscriptionClient != null)
{
_subscriptionClient.Close();
}
_topicClient.Close();
}
#endregion Methods
}
我建议采取不同的路线。与其尝试创建一条消息会话来传递大消息,不如使用专门解决此问题的索赔检查模式 - 大型附件。将您的数据写入存储空间,并发送带有消息的URI。保存/还原斑点,而不是试图在块中发送有效载荷。此外,这样可以更容易地监视您的系统(一个失败的成功/失败的消息与一个或多个斑点相关联),您可以更容易。t必须使用会议或任何特殊的会话。