masstranst连接现有AWS SNS/SQS失败



目前,我正试图从aws sdk转移到MassTransit。我有一个大问题与连接到SQS/SNS。我们的基础设施团队使用地形脚本创建sqs/sns。我们(作为开发人员和应用程序)不能(以任何方式)改变它,因为基础设施是由一个单独的团队和另一个aws帐户/角色管理的。

我们正在使用MassTransit 8.0(最新版本)

基本上我们有:队列:

名称:bci-name-dev-aim-name

加密:SSE-SQS

我们有SNS订阅的地方:

arn:aws:sqs:us-west-2::bci-name-dev-aim-name到topic: bci-name-dev-aim-name-topic(协议sqs)

作为主题

名称:bci-name-dev-aim-name-topic

加密:默认加密(alias/aws/sns))

我的生产者配置如下:

services.AddMassTransit(cfg =>
{
cfg.UsingAmazonSqs((context, bus) =>
{
bus.UseMessageScheduler(schedulerEndpoint);

bus.Host("us-west-2", h => {}); //using env Variables
bus.Message<PublicationMessage>(x =>
{
x.SetEntityName("bci-name-dev-aim-name-topic");
});

bus.ConfigureEndpoints(context);
});
});

我consument:

services.AddMassTransit(cfg =>
{
cfg.AddConsumer<PublicationMessageConsumer>();

cfg.UsingAmazonSqs((context, bus) =>
{                            
bus.Host("us-west-2", h => {});

bus.PublishTopology.TopicAttributes.Add(QueueAttributeName.KmsMasterKeyId, "alias/aws/sns");
bus.ReceiveEndpoint("bci-name-dev-aim-name", ec =>  //queue name
{
// disable the default topic binding
ec.ConfigureConsumeTopology = false;
ec.QueueAttributes.Add(QueueAttributeName.KmsMasterKeyId, "alias/aws/sqs");

ec.Subscribe("bci-name-dev-aim-name-topic");    //topic name

ec.ConfigureConsumer<PublicationMessageConsumer>(context);
});

bus.ConfigureEndpoints(context);
});
});

启动应用程序(控制台应用程序)后。生产者报告(在启动应用程序时):

info: MassTransit[0]
Bus started: amazonsqs://us-west-2/

但是当我试着开始消费时,我得到:

info: MassTransit[0]
Configured endpoint bci-name-dev-aim-name, Consumer: Consumer.PublicationMessageConsumer

(这是唯一的日志,所以没有总线启动日志等)

当我停止消费者时,我收到:

warn: MassTransit[0]
Failed to stop bus: amazonsqs://us-west-2/BKOWALCZYKW1_Consumer_bus_kotoyynbyybfbeanbdpf9sykno?durable=false&autodelete=true (Not Started)

你能建议一下吗?

+ + + + + +编辑:

我试着去适应@Chris Patterson的建议。目前我的配置如下:

制作人:

services.AddMassTransit(cfg =>
{
cfg.UsingAmazonSqs((context, bus) =>
{
bus.UseMessageScheduler(schedulerEndpoint);

bus.Host("us-west-2", h => {}); //using env Variables
bus.Message<PublicationMessage>(x =>
{
x.SetEntityName("bci-name-dev-aim-name-topic");
});

bus.ConfigureEndpoints(context);
});
});
消费者:

services.AddMassTransit(cfg =>
{
cfg.AddConsumer<PublicationMessageConsumer>();

cfg.UsingAmazonSqs((context, bus) =>
{                            
bus.Host("us-west-2", h => {});
bus.ReceiveEndpoint("bci-name-dev-aim-name", ec =>  //queue name
{
// disable the default topic binding
ec.ConfigureConsumeTopology = false;

ec.ConfigureConsumer<PublicationMessageConsumer>(context);
});

bus.ConfigureEndpoints(context);
});
});

看起来消费者报告:&;Bus started: amazonsqs://us-west-2/&;所以作品

如果已经创建了SQS队列,已经创建了SNS主题,并且已经创建了SNS- SQS的订阅,则应该删除.Subscribe调用,因为它将尝试在主题中创建订阅。

同样,在MassTransit中设置队列/主题属性不会做任何事情,因为主题和队列已经创建。

应该将MassTransit配置为使用最少数量的权限。您还可能希望在发生错误时预先创建_error队列,或者配置MassTransit以使用标准SQS死信队列和重驱动策略。

最新更新