On premise NServicebus applicaton 从 Azure ServiceBus 队列接收消息



我目前正在努力在nServiceBus托管的应用程序上启动并运行某些东西。我有一个 Azure ServiceBus 队列,第三方正在向其发布消息,我希望我的应用程序(目前在本地托管)接收这些消息。

我已经在谷歌上搜索了有关如何配置端点的答案,但我在有效的配置中没有运气。有没有人这样做过,因为我可以找到如何连接到 Azure 存储队列而不是服务总线队列的示例。(出于其他原因,我需要 Azure 服务总线队列)

我的配置如下

public void Init()
    {
        Configure.With()
           .DefaultBuilder()
           .XmlSerializer()
           .UnicastBus()
           .AzureServiceBusMessageQueue()
           .IsTransactional(true)
           .MessageForwardingInCaseOfFault()
           .UseInMemoryTimeoutPersister()
           .InMemorySubscriptionStorage();
    }

. 消息=启动终结点时出现异常,已记录错误。原因:输入队列 [mytimeoutmanager@sb://[*].servicebus.windows.net/] 必须与此 Source=NServiceBus.Host 位于同一台计算机上

.

<configuration>
  <configSections>
    <section name="MessageForwardingInCaseOfFaultConfig" type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />
    <section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
    <section name="AzureServiceBusQueueConfig" type="NServiceBus.Config.AzureServiceBusQueueConfig, NServiceBus.Azure" />
    <section name="AzureTimeoutPersisterConfig" type="NServiceBus.Timeout.Hosting.Azure.AzureTimeoutPersisterConfig, NServiceBus.Timeout.Hosting.Azure" />
  </configSections>
  <AzureServiceBusQueueConfig IssuerName="owner" QueueName="testqueue" IssuerKey="[KEY]" ServiceNamespace="[NS]" />
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
  <!-- Use the following line to explicitly set the Timeout manager address -->
  <UnicastBusConfig TimeoutManagerAddress="MyTimeoutManager" />
  <!-- Use the following line to explicity set the Timeout persisters connectionstring -->
  <AzureTimeoutPersisterConfig ConnectionString="UseDevelopmentStorage=true" />
  <startup useLegacyV2RuntimeActivationPolicy="true">
    <supportedruntime version="v4.0" />
    <requiredruntime version="v4.0.20506" />
    <supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0" />
  </startup>
</configuration>

尝试将UnicastBus()移到通话结束,如下所示:

    Configure.With()
       .DefaultBuilder()
       .XmlSerializer()
       .AzureServiceBusMessageQueue()
       .IsTransactional(true)
       .MessageForwardingInCaseOfFault()
       .UseInMemoryTimeoutPersister()
       .InMemorySubscriptionStorage()
       .UnicastBus(); // <- Here

以及那些向队列发布消息的第三方。请记住,他们需要尊重 NServiceBus 处理序列化/反序列化的方式。以下是在 NServiceBus 中如何完成此操作(最重要的部分是 BrokeredMessage 是使用原始消息初始化的,这是使用 BinaryFormatter 进行序列化的结果):

    private void Send(Byte[] rawMessage, QueueClient sender)
    {
        var numRetries = 0;
        var sent = false;
        while(!sent)
        {
            try
            {
                var brokeredMessage = new BrokeredMessage(rawMessage);
                sender.Send(brokeredMessage);
                sent = true;
            }
                // back off when we're being throttled
            catch (ServerBusyException)
            {
                numRetries++;
                if (numRetries >= MaxDeliveryCount) throw;
                Thread.Sleep(TimeSpan.FromSeconds(numRetries * DefaultBackoffTimeInSeconds));
            }
        }
    }
    private static byte[] SerializeMessage(TransportMessage message)
    {
        if (message.Headers == null)
            message.Headers = new Dictionary<string, string>();
        if (!message.Headers.ContainsKey(Idforcorrelation))
            message.Headers.Add(Idforcorrelation, null);
        if (String.IsNullOrEmpty(message.Headers[Idforcorrelation]))
            message.Headers[Idforcorrelation] = message.IdForCorrelation;
        using (var stream = new MemoryStream())
        {
            var formatter = new BinaryFormatter();
            formatter.Serialize(stream, message);
            return stream.ToArray();
        }
    }

如果希望 NServiceBus 正确反序列化消息,请确保第三方正确序列化该消息。

我现在

遇到了完全相同的问题,并花了几个小时来弄清楚如何解决它。基本上,Azure 超时持久化器仅对使用 NServiceBus.Hosting.Azure 的 Azure 托管终结点受支持。如果使用 NServiceBus.Host 进程来托管端点,它将使用 NServiceBus.Timeout.Hosting.Windows 命名空间类。它使用 MSMQ 初始化了事务传输,然后您会收到此消息。

我使用了两种方法来避免它:

  1. 如果必须使用As_Server端点配置,则可以使用 .DisableTimeoutManager() 在初始化中,它将完全跳过 TimeoutDispatcher 初始化
  2. 使用As_Client终结点配置,它不对传输使用事务模式,并且超时调度程序未初始化

可能有一种方法可以以某种方式注入 Azure 超时管理器,但我还没有找到它,我实际上需要As_Client东西,所以它对我来说很好用。

最新更新