目前,我正试图从aws sdk转移到MassTransit。我有一个大问题与连接到SQS/SNS。我们的基础设施团队使用地形脚本创建sqs/sns。我们(作为开发人员和应用程序)不能(以任何方式)改变它,因为基础设施是由一个单独的团队和另一个aws帐户/角色管理的。
我们正在使用MassTransit 8.0(最新版本)
基本上我们有:队列:
名称:bci-name-dev-aim-name
加密:SSE-SQS
我们有SNS订阅的地方:
arn:aws:sqs:us-west-2::bci-name-dev-aim-name到topic: bci-name-dev-aim-name-topic(协议sqs)
作为主题
名称:bci-name-dev-aim-name-topic
加密:默认加密(alias/aws/sns))
我的生产者配置如下:
services.AddMassTransit(cfg =>
{
cfg.UsingAmazonSqs((context, bus) =>
{
bus.UseMessageScheduler(schedulerEndpoint);
bus.Host("us-west-2", h => {}); //using env Variables
bus.Message<PublicationMessage>(x =>
{
x.SetEntityName("bci-name-dev-aim-name-topic");
});
bus.ConfigureEndpoints(context);
});
});
我consument:
services.AddMassTransit(cfg =>
{
cfg.AddConsumer<PublicationMessageConsumer>();
cfg.UsingAmazonSqs((context, bus) =>
{
bus.Host("us-west-2", h => {});
bus.PublishTopology.TopicAttributes.Add(QueueAttributeName.KmsMasterKeyId, "alias/aws/sns");
bus.ReceiveEndpoint("bci-name-dev-aim-name", ec => //queue name
{
// disable the default topic binding
ec.ConfigureConsumeTopology = false;
ec.QueueAttributes.Add(QueueAttributeName.KmsMasterKeyId, "alias/aws/sqs");
ec.Subscribe("bci-name-dev-aim-name-topic"); //topic name
ec.ConfigureConsumer<PublicationMessageConsumer>(context);
});
bus.ConfigureEndpoints(context);
});
});
启动应用程序(控制台应用程序)后。生产者报告(在启动应用程序时):
info: MassTransit[0]
Bus started: amazonsqs://us-west-2/
但是当我试着开始消费时,我得到:
info: MassTransit[0]
Configured endpoint bci-name-dev-aim-name, Consumer: Consumer.PublicationMessageConsumer
(这是唯一的日志,所以没有总线启动日志等)
当我停止消费者时,我收到:
warn: MassTransit[0]
Failed to stop bus: amazonsqs://us-west-2/BKOWALCZYKW1_Consumer_bus_kotoyynbyybfbeanbdpf9sykno?durable=false&autodelete=true (Not Started)
你能建议一下吗?
+ + + + + +编辑:
我试着去适应@Chris Patterson的建议。目前我的配置如下:
制作人:
services.AddMassTransit(cfg =>
{
cfg.UsingAmazonSqs((context, bus) =>
{
bus.UseMessageScheduler(schedulerEndpoint);
bus.Host("us-west-2", h => {}); //using env Variables
bus.Message<PublicationMessage>(x =>
{
x.SetEntityName("bci-name-dev-aim-name-topic");
});
bus.ConfigureEndpoints(context);
});
});
消费者:services.AddMassTransit(cfg =>
{
cfg.AddConsumer<PublicationMessageConsumer>();
cfg.UsingAmazonSqs((context, bus) =>
{
bus.Host("us-west-2", h => {});
bus.ReceiveEndpoint("bci-name-dev-aim-name", ec => //queue name
{
// disable the default topic binding
ec.ConfigureConsumeTopology = false;
ec.ConfigureConsumer<PublicationMessageConsumer>(context);
});
bus.ConfigureEndpoints(context);
});
});
看起来消费者报告:&;Bus started: amazonsqs://us-west-2/&;所以作品
如果已经创建了SQS队列,已经创建了SNS主题,并且已经创建了SNS- SQS的订阅,则应该删除.Subscribe
调用,因为它将尝试在主题中创建订阅。
同样,在MassTransit中设置队列/主题属性不会做任何事情,因为主题和队列已经创建。
应该将MassTransit配置为使用最少数量的权限。您还可能希望在发生错误时预先创建_error
队列,或者配置MassTransit以使用标准SQS死信队列和重驱动策略。