在 Kafka 使用者中实现并发



我们正在努力并行化我们的 Kafka 使用者,以处理更多数量的记录来处理峰值负载。我们已经在做的一种方法是在同一消费者组中启动尽可能多的消费者和许多分区。

我们的消费者处理进行 API 调用,该调用目前是同步的。我们认为使这个 API 调用异步将使我们的使用者处理更多的负载。因此,我们试图使 API 调用异步,并在其响应中增加偏移量。但是,我们看到了一个问题:

通过使 API 调用异步,我们可以首先获得最后一条记录的响应,而到那时,前一条记录的 API 调用都没有启动或完成。如果我们在收到最后一条记录的响应后立即提交偏移量,则偏移量将更改为最后一条记录。同时,如果使用者重新启动或分区重新平衡,我们将不会在提交偏移量的最后一个记录之前收到任何记录。这样,我们将错过未处理的记录。

截至目前,我们已经有 25 个分区。我们期待了解是否有人在不增加分区的情况下实现了并行性,或者增加分区是实现并行性的唯一方法(以避免偏移问题)。

首先,您需要将消息的读取与这些消息的处理分离(如果只是一开始)。接下来看看你可以对 API 进行多少个并发调用,因为调用它的频率高于服务器可以处理的频率是没有任何意义的,无论是否异步。如果并发 API 调用的数量大致等于主题中的分区数,则异步调用 API 是没有意义的。

如果分区数明显小于可能的最大并发 API 调用数,则您有几种选择。 您可以尝试通过按照建议异步调用 API 来使用较少的线程(每个使用者一个)进行最大数量的并发 API 调用,也可以创建更多线程并同步进行调用。 当然,接下来你会遇到一个问题,即你的消费者如何将他们的工作交给更多的共享线程,但这正是像 Flink 或 Storm 这样的流执行平台为你所做的。 提供检查点处理的流媒体平台(如 Flink)也可以处理您在消息处理顺序不正确时如何处理偏移量提交的问题。您可以滚动自己的检查点处理并滚动自己的共享线程管理,但您必须真正避免使用流式执行平台。

最后,您的使用者可能多于最大可能的并发 API 调用数,但我建议您只使用较少的使用者和共享分区,而不是 API 调用线程。

而且,当然,您可以随时更改主题分区的数量,以使上述首选选项更加可行。

无论哪种方式,为了回答你的具体问题,你想看看 Flink 如何使用 Kafka 偏移量提交进行检查点处理。 为了过度简化(因为我认为你不想自己滚动),kafka 使用者不仅必须记住他们刚刚提交的偏移量,而且还必须保留之前提交的偏移量,这定义了流经应用程序的消息块。要么完全处理该消息块,要么需要将每个线程的处理状态回滚到处理前一个块中最后一条消息的位置。 同样,这是一个重大的过度简化,但这就是它的方式。

你必须看看kafkabatch处理。简而言之:您可以使用少量(甚至单个)partitions设置巨大的batch.size。就目前而言,由于consumer端(即在 ram 内存中)消耗了整个messagesbatch- 您可以以任何您想要的方式并行化此消息。

我真的很想分享链接,但他们的数量在网孔上滚动。

更新

在提交偏移量方面 - 您可以对整个batch执行此操作。 通常,kafka 不会通过滥用分区数来实现目标性能要求,而是依赖于batch处理。

我已经看到很多项目,遭受分区扩展的困扰(您可能会在以后看到问题,例如在重新平衡期间)。经验法则 - 首先查看每个可用的batch设置。

最新更新