以下代码在API上非常有效。
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<RelationshipCreatedConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
h.Config(new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://host.docker.internal:4566" });
h.Config(new AmazonSQSConfig { ServiceURL = "http://host.docker.internal:4566" });
});
cfg.ReceiveEndpoint("rc_queue", e =>
{
e.UseMessageRetry(r => r.Interval(2, TimeSpan.FromMilliseconds(5000)));
e.ConfigureConsumeTopology = false;
e.Subscribe(nameof(RelationshipCreated));
e.ConfigureConsumer<RelationshipCreatedConsumer>(context);
});
cfg.Message<RelationshipCreated>(top =>
{
top.SetEntityName(nameof(RelationshipCreated));
});
});
});
在此设置中,使用者也位于API中。我想把它提取成Lambda函数。因此,我从上面的代码中删除了AddConsumer
和ReceiveEndpoint
调用。现在我的API能够发布消息并在需要时创建主题。太棒了
以下是我为Lambda函数创建的代码:
public class Function
{
private ServiceProvider _provider;
/// <summary>
/// Default constructor. This constructor is used by Lambda to construct the instance. When invoked in a Lambda environment
/// the AWS credentials will come from the IAM role associated with the function and the AWS region will be set to the
/// region the Lambda function is executed in.
/// </summary>
public Function()
{
Console.WriteLine("ok");
var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.AddConsumer<RelationshipCreatedConsumer>();
x.UsingAmazonSqs((context, cfg) =>
{
cfg.Host(new Uri("amazonsqs://localhost:4566"), h =>
{
h.AccessKey("your-iam-access-key");
h.SecretKey("your-iam-secret-key");
h.Config(new AmazonSQSConfig { ServiceURL = "http://host.docker.internal:4566" });
h.Config(new AmazonSimpleNotificationServiceConfig { ServiceURL = "http://host.docker.internal:4566" });
});
cfg.ReceiveEndpoint(queueName: "dummy", e =>
{
e.Subscribe(nameof(RelationshipCreated));
e.ConfigureConsumer<RelationshipCreatedConsumer>(context);
});
cfg.ConfigureEndpoints(context);
});
});
_provider = services.BuildServiceProvider(true);
}
public class RelationshipCreatedConsumer : IConsumer<RelationshipCreated>
{
public Task Consume(ConsumeContext<RelationshipCreated> context)
{
Console.WriteLine("YUS");
// TODO: write something to S3
return Task.CompletedTask;
}
}
/// <summary>
/// This method is called for every Lambda invocation. This method takes in an SNS event object and can be used
/// to respond to SNS messages.
/// </summary>
/// <param name="evnt"></param>
/// <param name="context"></param>
/// <returns></returns>
public async Task FunctionHandler(RelationshipCreated evnt, ILambdaContext context)
{
Console.WriteLine("YUS2");
var consumer = _provider.GetRequiredService<IConsumer<RelationshipCreated>>();
return;
}
}
我期望上面的代码创建一个名为dummy的队列,并订阅API创建的主题。这些都没有发生,也没有任何错误。我通过http调用执行了lambda,从而执行了构造函数。
我做错了什么?
有一个示例显示了如何在AWS Lambda函数中使用MassTransit消费者(由SQS触发(。我建议使用它来构建您的消费者功能。
当然,MassTransit不会创建队列,因为它必须存在,然后才能注册Lambda触发器。在函数(Lambda或Azure函数(中使用时,MassTransit不是传输,只是管道/调度引擎。