从总线 1 的消息处理程序中的总线 2 重新总线发送



我必须重新总线使用Azure ServiceBus,但是当我尝试从Bus1的消息处理程序之一中的Bus2发送到时,这不起作用。消息未发送。

有什么想法吗?

编辑

巴士 1

        string padesQueueAddress = "padesworker";
        int numberOfWorkes = Settings.NumberOfWorkers>0   Settings.NumberOfWorkers:10;
        string errorQueueAddress = string.Format("{0}-error", queueAddress);
        var adapter = new AutofacContainerAdapter(Container);
        Bus = Configure.With(adapter)
             .Logging(l =>  l.Use(UseRaygunRebusLoggingFactory(rayclient,Settings.Debug ? RaygunLoggerLevel.DEBUG :  RaygunLoggerLevel.WARN)))
             .Transport(t => t.UseAzureServiceBus( Settings.AzureQueueConnectionString, queueAddress,AzureServiceBusMode.Standard))                                  
             .Sagas(s => s.StoreInSqlServer(string.IsNullOrWhiteSpace(Settings.RebusSagaSqlConnectionString) ? Settings.AzureSqlConnectionString : Settings.RebusSagaSqlConnectionString, "Saga", "SagaIndex") )
             .Routing(r => r.TypeBased().MapAssemblyOf<SendSmsCommand>(queueAddress).MapAssemblyOf<Unipluss.Sign.Pades.Commands.CreatePadesCommand>(padesQueueAddress))                 
            .Options(o =>
            {
                o.SimpleRetryStrategy(secondLevelRetriesEnabled: true, maxDeliveryAttempts:5,errorQueueAddress: errorQueueAddress);
                o.SetNumberOfWorkers(numberOfWorkes);
                o.SetMaxParallelism(numberOfWorkes);                    
            })                
            .Start();
        await Bus.SendLocal(new HeartBeatCommand());

巴士 2

    private  IBus CreateExternalEventBus()
    {
        var eventBus = Configure.With(new BuiltinHandlerActivator())
                .Transport(t => t.UseAzureServiceBus(Settings.EventServiceBusConnectionString, queueAddress+"_event", AzureServiceBusMode.Basic))
                .Logging(x=>x.ColoredConsole(LogLevel.Debug))
                .Options(o =>
                {
                o.LogPipeline(true);
                o.EnableCompression();
                o.EnableEncryption(Settings.RebusEncryptionExternalEvents);
                })
                .Start();
        eventBus.Advanced.Routing.Send("1dd0f6f9422146048516a30f00aef4e5",new Unipluss.Sign.Events.Entities.DocumentCancledEvent() {CancledMessage = "test",DocumentId = Guid.NewGuid()});
        return eventBus;
    }

带有硬编码发送的 eventBus 发送正在工作,但是当我从 Bus1 的消息处理程序之一发送时,消息不会发送(记录器上的日志记录表明消息已发送,但它确实出现在队列中)。

总线 2 包装在包装类中,然后注入到 Autofac 中,以避免在 Autofac 中插入 IBus 接口。

        builder.Register(c => new ExternalEventsBus(CreateExternalEventBus()))
 .As<IExternalEventsBus>().SingleInstance();
public class ExternalEventsBus:IExternalEventsBus
{
    private IBus Bus;
    public ExternalEventsBus(IBus bus)
    {
        Bus = bus;
    }
    public async Task Send(object message, Guid documentProviderId)
    {
        await Bus.Advanced.Routing.Send(documentProviderId.ToString("n"), message);
    }
    public Task Send(object message, DocumentProvider documentProvider)
    {
        if (!string.IsNullOrWhiteSpace(documentProvider.RebusQueueConnectionString))
            return Send(message, documentProvider.Id);
        return Task.FromResult(true);
    }
    public void Dispose()
    {
        if(Bus!=null)
            Bus.Dispose();
    }
}

然后,IExternalEventsBus 在 Bus1 中的多个消息处理程序中使用。

好的...让我看看我是否理解这一点(如果我错了,请纠正我):

您的进程中有两个总线实例:

    一个使用输入队列的输入队列,其
  1. 值由 queueAddress 指定,您在此处未包含该队列。
  2. 另一个("外部事件总线")具有输入队列,其值由 queueAddress+"_event" 指定,您在此处也没有包括该队列。

似乎总线实例的目的是第一个用于协调应用程序内的内容,而第二个用于将事件路由到外部的侦听器 - 这就是您所说的"基于内容的路由器",因为它将根据消息内容的某些值路由消息(在本例中为 documentProviderId)。

现在您遇到错误:当总线 (1) 从其自己的消息处理程序之一中使用总线 (2) 时,似乎不会发送路由的消息。

从您从ExternalEventsBus发布的代码中不清楚您正在调用哪个Send方法 - 但我可以告诉您,带有签名public Task Send(object message, DocumentProvider documentProvider)的方法只会在documentProvider.RebusQueueConnectionString不为 null 时发送消息。

  1. 是否验证了连接字符串是否确实带有值?
  2. 为什么ExternalEventsBus中有_connectionString字段?创建总线时应该使用它吗?
  3. 你记得await bus.Send(...)(即 await异步操作的结果)每次调用总线时?

最新更新