masstransse -发布到一个由不同的worker使用的队列



我正在使用RabbitMQ的MassTransit,我有一个配置队列和消费者的worker -我希望做的是能够给它"角色";告诉它应该能够使用哪些队列。

我的代码是这样的:

x.UsingRabbitMq((context, config) =>
{
config.Host(server, virtualHost, h =>
{
h.Username(username);
h.Password(password);
});
services.AddMassTransit(x =>
{
x.AddConsumer<OrderConsumer>();
x.AddConsumer<OrderMessagingConsumer>();
x.AddConsumer<InventoryConsumer>();
});
config.ReceiveEndpoint("Orders", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer<OrderConsumer>(context, cfg => {});
}
config.ReceiveEndpoint("Orders.Messages", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer<OrderMessagingConsumer>(context, cfg => {});
}
config.ReceiveEndpoint("Inventory", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer<InventoryConsumer>(context, cfg => {});
}
}

上面的代码按预期工作,创建队列并使用消息。重要的是要注意,OrderConsumer作为其逻辑的一部分创建了一个发布到Inventory的消息,该消息由InventoryConsumer消耗。

为使其受worker角色的约束,我将其更改为如下内容:

x.UsingRabbitMq((context, config) =>
{
config.Host(server, virtualHost, h =>
{
h.Username(username);
h.Password(password);
});
services.AddMassTransit(x =>
{
if (workerRoles.OrderProcessor)
{
x.AddConsumer<OrderConsumer>();
x.AddConsumer<OrderMessagingConsumer>();
}
if (workerRoles.InventoryProcessor)
{
x.AddConsumer<InventoryConsumer>();
}
});
if (workerRoles.OrderProcessor)
{
config.ReceiveEndpoint("Orders", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer<OrderConsumer>(context, cfg => {});
}
config.ReceiveEndpoint("Orders.Messages", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer<OrderMessagingConsumer>(context, cfg => {});
}
}
if (workerRoles.InventoryProcessor)
{
config.ReceiveEndpoint("Inventory", cfg =>
{
cfg.Durable = true;
cfg.ConcurrentMessageLimit = 10;
cfg.PrefetchCount = 40;
cfg.ConfigureConsumer<InventoryConsumer>(context, cfg => {});
}
}
}

当我添加条件时,Inventory队列不会创建,因此OrderConsumer发布的消息不会排队,因此不会处理(由另一个具有InventoryProcessor标志的工作器打开)。

我试着改变逻辑,代码完全是在第一个例子中,但队列有一个ConcurrentMessageLimit和一个PrefetchCount等于0 -但这只是排队的消息,并立即将它们移动到Inventory_skipped队列。

我希望完成/理解的是如何处理一个worker应该发布到一个不被该worker使用的队列的情况。在这种情况下,我的所有工人都应该定义所有其他工人正在使用的所有队列吗?(想象一下这样一种情况:一个worker没有定义队列并开始工作,但是定义队列的worker还没有启动并运行以创建队列)。

您的配置中有一些方面非常值得关注,特别是在总线配置中调用AddMassTransit?非常奇怪。我已经按照要求更新了您的配置,以使用我认为可管理的解决方案:

services.AddMassTransit(x =>
{
if (workerRoles.OrderProcessor)
{
x.AddConsumer<OrderConsumer>()
.Endpoint(e => e.Name = "Orders");
x.AddConsumer<OrderMessagingConsumer>()
.Endpoint(e => e.Name = "Orders.Messages");   
}
if (workerRoles.InventoryProcessor)
{
x.AddConsumer<InventoryConsumer>()
.Endpoint(e => e.Name = "Inventory");    
}
x.UsingRabbitMq((context, config) =>
{
config.Host(server, virtualHost, h =>
{
h.Username(username);
h.Password(password);
});
config.ConcurrentMessageLimit = 10;
config.PrefetchCount = 40;
config.ConfigureEndpoints(context);
}
});

最新更新