我必须重新总线使用Azure ServiceBus,但是当我尝试从Bus1的消息处理程序之一中的Bus2发送到时,这不起作用。消息未发送。
有什么想法吗?
编辑
巴士 1
string padesQueueAddress = "padesworker";
int numberOfWorkes = Settings.NumberOfWorkers>0 Settings.NumberOfWorkers:10;
string errorQueueAddress = string.Format("{0}-error", queueAddress);
var adapter = new AutofacContainerAdapter(Container);
Bus = Configure.With(adapter)
.Logging(l => l.Use(UseRaygunRebusLoggingFactory(rayclient,Settings.Debug ? RaygunLoggerLevel.DEBUG : RaygunLoggerLevel.WARN)))
.Transport(t => t.UseAzureServiceBus( Settings.AzureQueueConnectionString, queueAddress,AzureServiceBusMode.Standard))
.Sagas(s => s.StoreInSqlServer(string.IsNullOrWhiteSpace(Settings.RebusSagaSqlConnectionString) ? Settings.AzureSqlConnectionString : Settings.RebusSagaSqlConnectionString, "Saga", "SagaIndex") )
.Routing(r => r.TypeBased().MapAssemblyOf<SendSmsCommand>(queueAddress).MapAssemblyOf<Unipluss.Sign.Pades.Commands.CreatePadesCommand>(padesQueueAddress))
.Options(o =>
{
o.SimpleRetryStrategy(secondLevelRetriesEnabled: true, maxDeliveryAttempts:5,errorQueueAddress: errorQueueAddress);
o.SetNumberOfWorkers(numberOfWorkes);
o.SetMaxParallelism(numberOfWorkes);
})
.Start();
await Bus.SendLocal(new HeartBeatCommand());
巴士 2
private IBus CreateExternalEventBus()
{
var eventBus = Configure.With(new BuiltinHandlerActivator())
.Transport(t => t.UseAzureServiceBus(Settings.EventServiceBusConnectionString, queueAddress+"_event", AzureServiceBusMode.Basic))
.Logging(x=>x.ColoredConsole(LogLevel.Debug))
.Options(o =>
{
o.LogPipeline(true);
o.EnableCompression();
o.EnableEncryption(Settings.RebusEncryptionExternalEvents);
})
.Start();
eventBus.Advanced.Routing.Send("1dd0f6f9422146048516a30f00aef4e5",new Unipluss.Sign.Events.Entities.DocumentCancledEvent() {CancledMessage = "test",DocumentId = Guid.NewGuid()});
return eventBus;
}
带有硬编码发送的 eventBus 发送正在工作,但是当我从 Bus1 的消息处理程序之一发送时,消息不会发送(记录器上的日志记录表明消息已发送,但它确实出现在队列中)。
总线 2 包装在包装类中,然后注入到 Autofac 中,以避免在 Autofac 中插入 IBus 接口。
builder.Register(c => new ExternalEventsBus(CreateExternalEventBus()))
.As<IExternalEventsBus>().SingleInstance();
public class ExternalEventsBus:IExternalEventsBus
{
private IBus Bus;
public ExternalEventsBus(IBus bus)
{
Bus = bus;
}
public async Task Send(object message, Guid documentProviderId)
{
await Bus.Advanced.Routing.Send(documentProviderId.ToString("n"), message);
}
public Task Send(object message, DocumentProvider documentProvider)
{
if (!string.IsNullOrWhiteSpace(documentProvider.RebusQueueConnectionString))
return Send(message, documentProvider.Id);
return Task.FromResult(true);
}
public void Dispose()
{
if(Bus!=null)
Bus.Dispose();
}
}
然后,IExternalEventsBus 在 Bus1 中的多个消息处理程序中使用。
好的...让我看看我是否理解这一点(如果我错了,请纠正我):
您的进程中有两个总线实例:
- 一个使用输入队列的输入队列,其
- 值由
queueAddress
指定,您在此处未包含该队列。 - 另一个("外部事件总线")具有输入队列,其值由
queueAddress+"_event"
指定,您在此处也没有包括该队列。
似乎总线实例的目的是第一个用于协调应用程序内的内容,而第二个用于将事件路由到外部的侦听器 - 这就是您所说的"基于内容的路由器",因为它将根据消息内容的某些值路由消息(在本例中为 documentProviderId
)。
现在您遇到错误:当总线 (1) 从其自己的消息处理程序之一中使用总线 (2) 时,似乎不会发送路由的消息。
从您从ExternalEventsBus
发布的代码中不清楚您正在调用哪个Send
方法 - 但我可以告诉您,带有签名public Task Send(object message, DocumentProvider documentProvider)
的方法只会在documentProvider.RebusQueueConnectionString
不为 null 时发送消息。
- 是否验证了连接字符串是否确实带有值?
- 为什么
ExternalEventsBus
中有_connectionString
字段?创建总线时应该使用它吗? - 你记得
await bus.Send(...)
(即await
异步操作的结果)每次调用总线时?