在 lambda 中配置时,MassTransit 不会创建队列/订阅



以下代码在API上非常有效。

builder.Services.AddMassTransit(x =>
{
x.AddConsumer<RelationshipCreatedConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
h.Config(new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://host.docker.internal:4566" });
h.Config(new AmazonSQSConfig { ServiceURL = "http://host.docker.internal:4566" });
});
cfg.ReceiveEndpoint("rc_queue", e =>
{
e.UseMessageRetry(r => r.Interval(2, TimeSpan.FromMilliseconds(5000)));
e.ConfigureConsumeTopology = false;
e.Subscribe(nameof(RelationshipCreated));
e.ConfigureConsumer<RelationshipCreatedConsumer>(context);
});
cfg.Message<RelationshipCreated>(top =>
{
top.SetEntityName(nameof(RelationshipCreated));
});
});
});

在此设置中,使用者也位于API中。我想把它提取成Lambda函数。因此,我从上面的代码中删除了AddConsumerReceiveEndpoint调用。现在我的API能够发布消息并在需要时创建主题。太棒了

以下是我为Lambda函数创建的代码:

public class Function
{
private ServiceProvider _provider;
/// <summary>
/// Default constructor. This constructor is used by Lambda to construct the instance. When invoked in a Lambda environment
/// the AWS credentials will come from the IAM role associated with the function and the AWS region will be set to the
/// region the Lambda function is executed in.
/// </summary>
public Function()
{
Console.WriteLine("ok");
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<RelationshipCreatedConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
h.Config(new AmazonSQSConfig { ServiceURL = "http://host.docker.internal:4566" });
h.Config(new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://host.docker.internal:4566" });
});
cfg.ReceiveEndpoint(queueName: "dummy", e =>
{
e.Subscribe(nameof(RelationshipCreated));
e.ConfigureConsumer<RelationshipCreatedConsumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});
_provider = services.BuildServiceProvider(true);
}
public class RelationshipCreatedConsumer : IConsumer<RelationshipCreated>
{
public Task Consume(ConsumeContext<RelationshipCreated> context)
{
Console.WriteLine("YUS");
// TODO: write something to S3
return Task.CompletedTask;
}
}

/// <summary>
/// This method is called for every Lambda invocation. This method takes in an SNS event object and can be used 
/// to respond to SNS messages.
/// </summary>
/// <param name="evnt"></param>
/// <param name="context"></param>
/// <returns></returns>
public async Task FunctionHandler(RelationshipCreated evnt, ILambdaContext context)
{
Console.WriteLine("YUS2");
var consumer = _provider.GetRequiredService<IConsumer<RelationshipCreated>>();
return;
}
}

我期望上面的代码创建一个名为dummy的队列,并订阅API创建的主题。这些都没有发生,也没有任何错误。我通过http调用执行了lambda,从而执行了构造函数。

我做错了什么?

有一个示例显示了如何在AWS Lambda函数中使用MassTransit消费者(由SQS触发(。我建议使用它来构建您的消费者功能。

当然,MassTransit不会创建队列,因为它必须存在,然后才能注册Lambda触发器。在函数(Lambda或Azure函数(中使用时,MassTransit不是传输,只是管道/调度引擎。

最新更新