使用Azure功能和服务总线的MassTransit异常处理和错误队列



我正在Azure Functions v3中运行MassTransit 7.1.8,服务总线作为传输。当使用者第一次抛出异常时,会在我的服务总线名称空间中创建两个主题:masstransit~fault(没有任何订阅(和masstransit~fault--mynamespace~myevent--(有一个订阅Fault-MassTransit(。

Azure功能Startup.cs

public override void Configure(IFunctionsHostBuilder builder)
{
builder.Services.AddMassTransitForAzureFunctions(cfg =>
{
cfg.AddConsumersFromNamespaceContaining<MyConsumer>();
});
}

Azure功能

[FunctionName("MyFunction")]
public async Task Run(
[ServiceBusTrigger("my-topic", "my-subscription", Connection = "AzureWebJobsServiceBus")] Message message, ILogger log)
{
try
{
await _receiver.HandleConsumer<MyConsumer>("my-topic", "my-subscription", message, default(CancellationToken));
}
catch (System.Exception ex)
{
}
}

消费者

public class MyConsumer : IConsumer<MyEvent>
{
public async Task Consume(ConsumeContext<MyEvent> context)
{
throw new System.Exception("Very bad things happened inside the consumer.");
}
}

我还必须手动在masstransit~fault上创建一个订阅,才能开始在那里接收消息。

我的问题:

  1. MyFunction正在接受异常,以防止将消息发送到DLQ。这样可以吗?吞下一个异常感觉很奇怪,但另一方面,MassTransit会将故障发送到错误队列,这样它就不会丢失
  2. 文档讨论了创建一个_error队列(前缀适当(,但我没有看到任何这样的队列或主题,只有masstransit~faultmasstransit~fault--mynamespace~myevent--。我猜masstransit~fault--mynamespace~myevent--相当于RabbitMQ_error的服务总线
  3. masstransit~fault--mynamespace~myevent--有消息发送到它,但当我查看订阅时,它是空的。订阅有一个默认的过滤器,可以接受所有内容。我必须对这个主题做一些额外的配置吗
  4. 有没有办法重命名上面的错误主题(也许在此期间发生了变化(

Azure函数并没有真正使用MassTransit作为传输。但是,可以将某些方面配置为模仿相同的行为。在队列上设置DeliveryCount = 1将阻止Azure重新传递消息(导致五次重试,默认值为5(。

针对各种问题:

  1. 这是交付计数,MassTransit发布了故障
  2. 您可以配置重试策略(如下所示(
  3. 使用Azure函数时没有_错误队列,因为MassTransit不是传输
  4. 除非为故障配置使用者,否则不会使用它
  5. 您可以使用实体名称格式化程序(如下所示(更改故障主题

为Azure函数配置总线重试:

.AddMassTransitForAzureFunctions(cfg =>
{
cfg.SetKebabCaseEndpointNameFormatter();
cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
}, (context, cfg) =>
{
cfg.MessageTopology.SetEntityNameFormatter(new CleanEntityNameFormatter(cfg.MessageTopology.EntityNameFormatter));
cfg.UseMessageRetry(r => r.Intervals(200, 500, 1000, 2000));
})

使用Azure功能,当重试次数用完时,Azure将把消息移动到死信队列(一个逻辑队列,是队列本身的一部分,在Azure门户中可见(。

指定的实体名称格式化程序将更改故障的主题名称。它必须用于将产生/消耗故障的每个总线实例,以确保名称匹配。

using MassTransit;
using MassTransit.Internals.Extensions;
using MassTransit.Topology;
public class CleanEntityNameFormatter :
IEntityNameFormatter
{
readonly IEntityNameFormatter _entityNameFormatter;
public CleanEntityNameFormatter(IEntityNameFormatter entityNameFormatter)
{
_entityNameFormatter = entityNameFormatter;
}
public string FormatEntityName<T>()
{
if (typeof(T).ClosesType(typeof(Fault<>), out Type[] types))
{
var name = (string) typeof(IEntityNameFormatter)
.GetMethod("FormatEntityName")
.MakeGenericMethod(types)
.Invoke(_entityNameFormatter, Array.Empty<object>());
var suffix = typeof(T).Name.Split('`').First();
return $"{name}-{suffix}";
}
return _entityNameFormatter.FormatEntityName<T>();
}
}

最新更新