Masstransit通过指定队列绑定在容器中添加使用者



我有以下消费者类型:

internal class ObjectAddedHandler : IConsumer<ObjectAddedIntegrationEvent>
{
public async Task Consume(ConsumeContext<ObjectAddedIntegrationEvent> context)
{
var @event = context.Message;
await HandleAsync(@event).ConfigureAwait(false);
}
}

通过在我的集装箱中注册

container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitMQHostName, hostConfigurator =>
{
hostConfigurator.Username(configurationProvider.RabbitMQUsername);
hostConfigurator.Password(configurationProvider.RabbitMQPassword);
hostConfigurator.UseCluster(c =>
{
string[] hostnames = configurationProvider.RabbitMQNodes.Split(';');
c.ClusterMembers = hostnames;
});
});
host.Settings.GetConnectionFactory().Endpoint.AddressFamily = AddressFamily.InterNetwork;
/*HERE*/ x.AddConsumer<ObjectAddedHandler>().Endpoint(e => e.Name = "ObjectAddedHandler "+configurationProvider.TenantName);
cfg.ConfigureEndpoints(container);
}));
});

但是,通过遵循文档,我希望设置直接交换以便使用路由密钥。在文档中找不到如何以我的方式添加使用者,同时设置文档中报告的端点的绑定属性。

当我尝试在添加使用者时访问端点时,我只能修改名称、预取计数和其他几个属性,但不能修改更多。但是,我希望将我的端点设置为仅接受具有路由关键字tenantName的消息。有办法吗?

编辑

发行方:

container.AddMassTransit(x =>
{
x.AddBus(() => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Send<ObjectAddedIntegrationEvent>(routingCfg => {
routingCfg.UseRoutingKeyFormatter(config => ConfigurationValuesProvider.Current.Get("TenantCode"));
});
cfg.Message<ObjectAddedIntegrationEvent>(routingCfg => routingCfg.SetEntityName("ObjectAddedIntegrationEvent"));
cfg.Publish<ObjectAddedIntegrationEvent>(routingCfg => routingCfg.ExchangeType = ExchangeType.Direct);

var host = cfg.Host(ConfigurationValuesProvider.Current.Get("RabbitMQHostName"), hostConfigurator =>
{
#if !DEBUG
hostConfigurator.Username(ConfigurationValuesProvider.Current.Get("RabbitMQUsername"));
hostConfigurator.Password(ConfigurationValuesProvider.Current.Get("RabbitMQPassword"));
hostConfigurator.UseCluster(c =>
{
string[] hostnames = ConfigurationValuesProvider.Current.Get("RabbitMQNodes").Split(';');
c.ClusterMembers = hostnames;
});
#endif
});
}));
});

接收方:

container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitMQHostName, hostConfigurator =>
{
hostConfigurator.Username(configurationProvider.RabbitMQUsername);
hostConfigurator.Password(configurationProvider.RabbitMQPassword);
hostConfigurator.UseCluster(c =>
{
string[] hostnames = configurationProvider.RabbitMQNodes.Split(';');
c.ClusterMembers = hostnames;
});
});
host.Settings.GetConnectionFactory().Endpoint.AddressFamily = AddressFamily.InterNetwork;
x.AddConsumer<ObjectAddedHandler>().Endpoint(e => e.Name = "ObjectAddedHandler "+configurationProvider.TenantName);
cfg.ConfigureEndpoints(container);
}));
});

internal class ObjectAddedHandler : IConsumer<ObjectAddedIntegrationEvent>
{
public async Task Consume(ConsumeContext<ObjectAddedIntegrationEvent> context)
{
var @event = context.Message;
await HandleAsync(@event).ConfigureAwait(false);
}
}

internal class ObjectAddedHandlerConsumerDefinition :
ConsumerDefinition<ObjectAddedHandler>
{
private readonly IConfigurationProvider _provider;
public ObjectAddedHandlerConsumerDefinition(IConfigurationProvider provider)
{
_provider = provider;
EndpointName = "ObjectAddedHandler" + provider.TenantName;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ObjectAddedHandler> consumerConfigurator)
{
if (endpointConfigurator is IRabbitMqReceiveEndpointConfigurator rabbit)
{
rabbit.BindMessageExchanges = false;
rabbit.Bind("ObjectAddedIntegrationEvent", s =>
{
s.RoutingKey = _provider.TenantName;
s.ExchangeType = ExchangeType.Direct;
});
}
}
}

启动使用者时,将正确创建ObjectAddedIntegrationEvent直接交换。此外,ObjectAddedHandlerTenant扇出交换(应该是匹配的交换(也已正确创建。不幸的是,当我尝试从发布方发送消息,并监视ObjectAddedIntegrationEvent直接交换时,我没有看到任何结果。

您可以使用消费者定义来实现这一点,该定义应该使用AddConsumer<T>(typeof(definitionclass))添加到您的消费者中。它可以类似于:

public class ObjectAddedHandlerDefinition :
ConsumerDefinition<ObjectAddedHandler>
{
public ObjectAddedHandlerDefinition(IConfigurationProvider provider)
{
_provider = provider;
EndpointName = "ObjectAddedHandler" + provider.TenantName;
ConcurrentMessageLimit = 4;
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<ObjectAddedHandler> consumerConfigurator)
{
if(endpointConfigurator is IRabbitMqReceiveEndpointConfigurator rabbit)
{
rabbit.ConfigureConsumeTopology = false;
// or use Bind<T> for message type name
rabbit.Bind("some-exchange", s => 
{
s.RoutingKey = _provider.TenantName;
s.ExchangeType = ExchangeType.Direct;
});
}
}
}

最新更新