Azure 事件中心引发异常:终结点至少创建一个纪元为"0"的接收器,因此不允许使用非纪元接收器



简介

大家好,我们目前正在开发一个微服务平台,该平台使用Azure EventHubs和事件在服务之间发送数据。让我们只命名这些服务:CustomerService、OrderService和MobileBFF。

CustomerService主要发送更新(带有事件),这些更新将由OrderService和MobileBFF存储,以便能够响应查询,而无需为此数据调用CustomerService。

所有这3个服务+我们在DEV环境中的开发人员都使用同一个ConsumerGroup来连接到这些事件中心。

我们目前只使用一个分区,但计划稍后扩展到多个分区。(你可以看到我们的代码已经可以从多个分区中读取)

例外

不过,我们偶尔会遇到一个异常(如果它启动了,通常会持续一个小时左右抛出这个错误)。不过,目前我们只在DEV/TEST环境中看到过这个错误。

例外:

Azure.Messaging.EventHubs.EventHubsException(ConsumerDisconnected): At least one receiver for the endpoint is created with epoch of '0', and so non-epoch receiver is not allowed. Either reconnect with a higher epoch, or make sure all epoch receivers are closed or disconnected.

EventHub的所有使用者都将其SequenceNumber存储在自己的数据库中。这允许我们让每个使用者分别使用事件,并将最后处理的SequenceNumber存储在其自己的SQL数据库中。当服务(重新)启动时,它从数据库加载SequenceNumber,然后从这里开始请求事件,直到找不到更多的事件为止。然后它会休眠100毫秒,然后重试。这是(稍微简化的)代码:

var consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
string[] allPartitions = null;
await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
{
allPartitions = await consumer.GetPartitionIdsAsync(stoppingToken);
}
var allTasks = new List<Task>();
foreach (var partitionId in allPartitions)
{
//This is required if you reuse variables inside a Task.Run();
var partitionIdInternal = partitionId;
allTasks.Add(Task.Run(async () =>
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))
{
EventPosition startingPosition;
using (var testScope = _serviceProvider.CreateScope())
{
var messageProcessor = testScope.ServiceProvider.GetService<EventHubInboxManager<T, EH>>();
//Obtains starting position from the database or sets to "Earliest" or "Latest" based on configuration
startingPosition = await messageProcessor.GetStartingPosition(_inboxOptions.InboxIdentifier, partitionIdInternal);
}
while (!stoppingToken.IsCancellationRequested)
{
bool processedSomething = false;
await foreach (PartitionEvent partitionEvent in consumer.ReadEventsFromPartitionAsync(partitionIdInternal, startingPosition, stoppingToken))
{
processedSomething = true;
startingPosition = await messageProcessor.Handle(partitionEvent);
}
if (processedSomething == false)
{
await Task.Delay(100, stoppingToken);
}
}
}
}
catch (Exception ex)
{
//Log error / delay / retry
}
}
}
}

异常在以下行引发:

await using (var consumer = new EventHubConsumerClient(consumerGroup, _inboxOptions.EventHubConnectionString, _inboxOptions.EventHubName))

更多调查

上面描述的代码正在MicroServices(在Azure中作为AppServices托管)中运行

除此之外,我们还运行1 Azure Function,它也从EventHub读取事件。(可能使用相同的消费者群体)。

根据此处的文档:https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-每个消费者组应该可以有5个消费者。似乎有人建议只有一个,但我们不清楚如果我们不遵循这一指导会发生什么。

我们确实做了一些测试,手动生成了读取事件的服务的多个实例,当超过5个时,这会导致一个不同的错误,该错误非常清楚地表明,每个消费者组(或类似的东西)每个分区只能有5个消费者。

此外,当我们重写代码(如上所述)以使每个分区能够生成一个线程时,似乎(我们不能100%确定)这个问题开始发生了。(尽管我们在EventHub中只有一个分区)编辑:我们进行了更多的日志挖掘,在合并代码以每个分区生成一个线程之前,还发现了一些异常

该异常表示有另一个使用者配置为使用同一使用者组,并断言对分区的独占访问。除非在客户端选项中显式设置OwnerLevel属性,否则可能的候选者是至少有一个EventProcessorClient正在运行。

要进行补救,您可以:

  • 停止针对同一事件中心和使用者组组合运行的任何事件处理器,并确保没有其他使用者显式设置OwnerLevel

  • 在一个专门的消费者小组中运行这些消费者;这将允许它们与独占消费者和/或事件处理器共存。

  • 为这些消费者显式地将OwnerLevel设置为1或更大;这将断言所有权,并迫使同一消费者群体中的任何其他消费者断开连接
    (注意:根据其他使用者的不同,您可能需要在此处测试不同的值。事件处理器类型使用0,因此任何高于0的值都将优先。)

要添加到Jesse的答案中,我认为异常消息是旧的SDK
如果您查看文档,其中定义了3种类型的接收模式:

  1. Epoch

Epoch是服务用于强制分区/租约所有权的唯一标识符(Epoch值)。epoch功能为用户提供了确保在任何时间点消费者组上只有一个接收器的能力。。。

  1. 非epoch:

。。。流处理中存在一些场景,用户希望在单个消费者组上创建多个接收器。为了支持这些场景,我们确实有能力创建一个没有epoch的接收器,在这种情况下,我们允许消费者组上最多5个并发接收器

  1. 混合:

。。。如果有一个接收器已经用epoche1创建并且正在主动接收事件,并且创建了一个没有epoch的新接收器,则新接收器的创建将失败。Epoch接收器在系统中总是优先。

相关内容

最新更新