试图从MassTransit消费者那里获得信息



我正在使用Masstransit和RabbitMQ发布事件(没有使用者,只使用发布者),目前我正在尝试创建一个集成测试来验证消息是否已发布,如果是,我想检查它是否是正确的消息。为此,我创建了一个消费者来消费队列中的消息,并将其与我预期的进行比较。这里的问题是,我无法使用该消息。事件已成功发布,但我无法获取消息。

这是负责连接消费者的类

public class ServiceBusHelper
{
private IBusControl bus;
private readonly string serviceBusQueueName = ConfigurationManager.AppSettings["ServiceBusQueuename"];
private readonly string serviceBusEndpoint = ConfigurationManager.AppSettings["ServiceBusEndPoint"];
private readonly string serviceBusUsername = ConfigurationManager.AppSettings["ServiceBusUsername"];
private readonly string serviceBusPassword = ConfigurationManager.AppSettings["ServiceBusPassword"];
public ConnectHandle HandleObserver { get; set; }
public void ConnectRabbitMQ()
{
bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(
new Uri(serviceBusEndpoint),
h =>
{
h.Username(serviceBusUsername);
h.Password(serviceBusPassword);
});
cfg.ReceiveEndpoint(
serviceBusQueueName,
e =>
{
e.Consumer<ServiceBusEventsHelper>();
});
});
//Observer
var observer = new PublishedObserverHelper();
HandleObserver = bus.ConnectPublishObserver(observer);
}

}

这是将使用消息的类

public class ServiceBusEventsHelper : IConsumer<ITransportCreatedEvent>
{
public ITransportCreatedEvent Result { get; set; }

public async Task Consume(ConsumeContext<ITransportCreatedEvent> context)
{
Result = await Task.FromResult(context.Message);
}

}

在测试方法中,我有这个

ServiceBusEventsHelper eventHelper = new ServiceBusEventsHelper();
ServiceBusHelper busHelper = new ServiceBusHelper();
try
{
busHelper.ConnectRabbitMQ();
transportResponseDto = await this.transportClient.CreateTransportAsync(transportRequest);
handle = busHelper.HandleObserver;
var eventResponse = eventHelper.Result;// Allways NULL
}
catch (Exception ex)
{
Assert.Fail(ex.GetDetailMessage());
}

我正试图得到像这个一样的消息结果

var eventResponse = eventHelper.Result;// Allways NULL

但始终为空。

有人能帮帮我吗??

我有一个服务,其中一个方法是CreateTransportAsync(),在该方法中我调用Publish

public async Task<TransportResponseDto> CreateTransportAsync(TransportDto request){
.
.
.
await this.RaiseTransportCreatedEvent(transportResponseDto);
}
private async Task RaiseTransportCreatedEvent(TransportResponseDto transportResponseDto)
{
var evt = CreateTransportEvent(transportResponseDto);
await this.serviceBus.Publish(evt).ConfigureAwait(false);
}
public class ServiceBus<T> : IServiceBus<T> where T : class
{
private readonly IBus bus;
public ServiceBus(IBus bus)
{
this.bus = bus;
}
public Task Publish(T evt)
{
return bus.Publish(evt, evt.GetType());
}
}

这就是我发布活动的方式,它是有效的。现在,我正在尝试在另一个解决方案中的集成测试中测试所有这些是否有效,在该解决方案中,我试图创建一个消费者来消费队列中的消息。然后,我想验证该消息(将其与json文件中的假消息进行比较),看看是否一切正常。问题是我无法获得该消息,也无法理解发生了什么。我真的不明白你在第三点想说什么。感谢

  1. 您需要通过调用bus.Start()来启动总线。你不这样做,所以无论如何都不会收到任何东西
  2. 目前尚不清楚transportClient.CreateTransportAsync的作用。谁在发布消息
  3. 每个消费的消息都实例化消费者。您在"测试"中所做的是实例化使用者的一个实例并保持对该实例的引用。然后你在某个地方发信息。MassTransit创建消费者的新实例,更新字段,然后处理该实例。但是您正在检查您最初创建的实例的Result,它从未接收到任何消息。它将永远是null
  4. 从发布者向消费者传递消息需要时间。您正试图在初始化总线后直接检查结果。我敢肯定,即使你修复了(1)、(2)和(3),你也不会这么快得到它

我不确定你到底想测试什么。使用MassTransit可以在所有传输上发布和使用消息。你可以从Github获得任何样本,构建它,运行它,并查看它的工作情况。

还有许多针对RabbitMQ传输的测试,展示了如何创建这样的东西。例如,检查ConsumerBind_Specs.cs文件。

此外,如果要使用某个现有的使用者实例,可以将该实例连接到总线,如文档"连接现有使用者实例"中所述。使用e.Instance而不是e.Consumer将使您的测试工作做好,等待Consume方法完成。然而,这并不是真正流行的方法,因为您确实希望将消费者范围限制为仅处理一条消息。

相关内容

最新更新