关于在node.js中设计Kafka Consumer并使其幂等的问题



我们正在尝试编写一个消费者,该消费者将订阅特定主题,并在满足数据中的几个条件时进行一些处理。然而,处理中的一个操作只能完成一次,为了简单起见,让我们将其视为非幂等的POST http请求。

以下是一些其他注意事项:-

  • 消费者需要在node.js中进行编码,并使用kafkajs库
  • 消费者需要在Kubernetes上运行
  • 副本的数量将等于生产商副本的数量
  • 我们将为我们的消费者使用一个消费者群体

我们在想,为了让这个消费者Idempont,我们也许可以做一些事情,比如:-

For Every Message
Check if message was processed
try:
If !processed
Do Processing (http POST included here)
catch for errors:
Do error processing
finally:
Mark message as processed

"将消息标记为已处理";基本上是将一些细节捕获到关系数据库(如Postgres),比如偏移量、时间戳和其他一些细节,以确保我们捕获了密钥,从而能够唯一地识别记录

  1. 以上内容是否有助于使消费者产生幻觉
  2. 你还可以提出其他哪些表现更好的替代方案

除此之外,还有一些关于上述场景中DB处理的最佳实践的问题:-

  1. 假设我有3个k8s节点,每个节点都有3个消费者pod在运行,基本上提供了9个单线程kafka消费者。这是正确的理解吗?

  2. 现在,由于这些线程中的每一个都将执行DB插入/读取,在池和客户端之间使用什么更好(假设节点postgres库)?

  3. 看来,如果我们在一天开始时打开客户端连接,并让它一直存在到一天结束,它应该对我们有效。这是一个好方法还是一个糟糕的设计?

  4. 如果我们对来自这9个消费者的每条消息进行处理,那么使用Pools会给我们带来什么好处吗。

其他假设:-

  • 交通时间:东部时间早上7:00开始,全天加速,美国晚间逐渐减少。东部时间凌晨2:00至东部时间早上6:00之间禁止交通
  • 平均值:美国白天每秒1条消息
  • 最大值:在美国白天,每秒5条消息,持续时间短
  • 延迟容忍度:在正常情况下,使用者中的POST可以从消息发布时间延迟不超过5分钟

感谢您耐心阅读。对这篇文章的篇幅感到抱歉。

  1. 以上内容是否适合制作消费者Idempont

是的,从幂等性的POV来看,您的代码看起来不错。由于您使用的是Kafka消费者,因此不需要用于消息处理的独占for循环。每次消息到达时都会调用消费者。你的psuedo代码应该是这样的:

Check if message was processed
try:
If !processed
Do Processing (http POST included here)
catch for errors:
Do error processing
finally:
Mark message as processed
  1. 您还可以提出其他哪些性能更好的替代方案

您的代码遗漏了一个重要方面,即并发重复消息。例如,以某种方式从生产者同时生成两条消息(这实际上是生产者端的一个错误),并且该消息应该只处理一次。使用者开始处理这两条消息。在这一点If !processed,两个消费者都看到相同的状态,即not processed,并且他们都进行到Do Processing。您可以通过获取某个id的锁来避免这种情况。通过该锁可以判断消息是否重复。由于您已经在使用Postgres,您可以查看pg_advisory_locks。所以现在,你的伪代码看起来像:

Check if message was processed
try:
acquire_lock(message.uniqId)    //pg_advisory_lock
If !processed
Do Processing (http POST included here)
catch for errors:
if error is lock_already_acquired_by_other_process
return as duplicate processor
else
Do error processing
finally:
Mark message as processed
release lock

我们仍然可以做一些改进。上面的代码不处理我们希望重试的失败场景。实现这一点有多种方法。哦,等等,你在用卡夫卡。为什么不在延迟一段时间后将处理失败的消息(显然不是重复的消息)发布到同一个Kafka主题中,并在消息对象中设置一些计数器来检查该消息被处理了多少次。我们当然希望只重试有限的次数,因此每次处理消息时,我们都可以检查之前在消息对象中设置的计数器,以控制重试次数。到目前为止还不错,但即使在固定次数的重试后仍失败的消息呢。对于这种情况,您希望有一个DLQ(死信队列),它保存这些消息和一些错误消息,直到您手动查看并修复错误为止。

这听起来有很多代码要写。我们有另一个好消息。有可用的库,您可以利用它们来实现所有这些。一个这样的图书馆是牛市。

  1. 假设我有3个k8s节点,每个节点都有3个消费者pod在运行,基本上提供了9个单线程kafka消费者。这是正确的理解吗

是的。据我所知。

  1. 既然这些线程中的每一个都将执行DB插入/读取,那么在池和客户端之间使用什么更好(假设节点postgres库)

使用池是可取的,因为您还希望实现更快的处理。使用连接池,您可以同时启动多个查询,而无需排队,使用任何使用并行执行的底层库等。当然,我们不应该用连接来填充内存,因此建议根据pod的内存调整池中的连接数量。

  1. 看来,如果我们在一天开始时打开客户端连接,并让它一直存在到一天结束,它应该对我们有效。这是一个好方法还是一个糟糕的设计

我不能正确理解你在这里试图做什么,但我支持连接池。

  1. 如果我们对这9个运行中的消费者的每条消息进行处理,那么使用池会给我们带来什么好处吗

是。除了第4点中已经提到的好处外,您还可以更好地利用k8s吊舱的资源(同样,这取决于根据消息传入率是否有9个消费者是最优的)。

相关内容

  • 没有找到相关文章

最新更新