1( 嗨。我正在用RabbitMQ学习MassTransit ,但坚持使用Request/Respond。我阅读了很多文章,并尝试使用 MassTransit 文档编写控制台应用程序。但仍然找不到有关使用 IRequestClient 接口初始化使用者的任何信息。这是我的代码:
static void Main(string[] args){
var serviceAddress = new Uri("loopback://localhost/notification.service");
var requestTimeout = TimeSpan.FromSeconds(120);
var bus = BusConfigurator.ConfigureBus((cfg, host) =>
{
cfg.ReceiveEndpoint(host, RabbitMqConstants.NotificationServiceQueue, e =>
{
e.Consumer(() => new OrderRegisteredConsumer(???));
});
});
IRequestClient<ISimpleRequest, ISimpleResponse> client = new MessageRequestClient<ISimpleRequest, ISimpleResponse>(bus, serviceAddress, requestTimeout);
bus.Start();
Console.WriteLine("Listening for Order registered events.. Press enter to exit");
Console.ReadLine();
bus.Stop();
}
和我的消费者
public class OrderRegisteredConsumer: IConsumer<IOrderRegisteredEvent>
{
private static IBusControl _bus;
IRequestClient<ISimpleRequest, ISimpleResponse> _client;
public OrderRegisteredConsumer(IRequestClient<ISimpleRequest, ISimpleResponse> client)
{
_client = client;
}
public async Task Consume(ConsumeContext<IOrderRegisteredEvent> context)
{
await Console.Out.WriteLineAsync($"Customer notification sent: Order id {context.Message.OrderId}");
ISimpleResponse response = await _client.Request(new SimpleRequest(context.Message.OrderId.ToString()));
Console.WriteLine("Customer Name: {0}", response.CustomerName);
}
}
我怎样才能把我的客户放进去
e.Consumer(() => new OrderRegisteredConsumer(???));
2(我也尝试在sagas中找到有关请求/响应的一些信息,但不幸的是,我发现的只是 https://github.com/MassTransit/MassTransit/issues/664
如果有人有在 sagas 中使用它的例子,或者如果有人可以提供一些链接,我将不胜感激,我可以在其中阅读更多内容。
需要客户端变量可用,但客户端不需要在配置终结点时准备就绪。 endpoint.Consumer
不会立即实例化使用者,它只需要一个工厂委托,当该使用者收到消息时,该委托将实例化使用者。
由于委托是引用类型,因此可以稍后在代码中分配它。
所以这将起作用:
IRequestClient<ISimpleRequest, ISimpleResponse> client;
var bus = BusConfigurator.ConfigureBus((cfg, host) =>
{
cfg.ReceiveEndpoint(host, RabbitMqConstants.NotificationServiceQueue, e =>
{
e.Consumer(() => new OrderRegisteredConsumer(client));
});
});
client = new MessageRequestClient<ISimpleRequest, ISimpleResponse>(
bus, serviceAddress, requestTimeout);