从同一JVM运行Kafka消费者和生产商时,生产商慢



我使用的是kafka 0.8和spring-integration-kafka 1.2.0.Release

我有2个名为初级和次要的主题。我需要从主要主题中消费,经过一些处理后需要生成次要主题,以便以后进行下一组处理。

虽然主要主题的消费效果很好,但在几分钟后生产到次要主题开始失败。问题首先是我设置的500毫秒后发送请求到Kafka超时。

结束线池。

如果我试图将事件提交到另一个Kafka群集的次要主题,则它可以毫无问题地起作用。

我有4位消费者跑步,每个主题都有200个分区。

我对Kafka并不陌生,请为任何知识缺乏借口。请评论我应该提供的任何丢失信息。

提供的信息很难知道,但我怀疑您可以从第一个主题中消费,然后计算出第一个主题的结果,而不是您对次要产生的结果话题。可能会发生这种情况的原因很多。例如,也许对次要主题的写作不是分区之间分布得很好。同样,生产到另一个集群可能会成功,原因有多种原因,包括更快的机器,更多的机器,更好的网络等。

基本问题并不是真正特定于Kafka的:如果您是从一个来源消耗的,并将该数据发送到第二个接收器,您通常无法假设第二个接收器总是比第一个源更快。每当第二个水槽变慢时,即使是一点点,您都会最终遇到这样的问题。例如,假设您可以从初级阅读100个事件/秒,但次级水槽只能消耗99个事件/秒。这意味着您最终会在内存中又有1个事件,等待发送到您的水槽。如果您不做任何事情来减慢您从主要来源阅读的速率,您将用完RAM,线程或其他资源。

一般解决方案是某种节流。例如,您可能会使用以500个许可开始的Semaphore:这意味着您永远不会从尚未成功发送到水槽的主要来源中读取超过500个项目。在从主要来源读取项目之前,您会降低Semaphore,以便如果您已经在"次级"之前"在"次级之前,则读者将阻止您的500个项目。每次您成功将项目发送到次要主题时,您都会发布许可证,允许其他阅读。

我会警告使用第二个kafka群集或其他有效但并不能真正解决核心问题之类的修复程序。例如,如果现在生产到另一个群集现在起作用,那么由于节点丢失,重新平衡等,该群集会减慢时,它不会暂时隐藏问题。

在尝试所有可能的配置后,最终发现了问题。

错误地忘记删除以下依赖性,该依赖性较早添加了用于消费者集成。

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>

这在产生时造成了一些冲突,这在等待状态中添加了线程。如果有人可以指导可以添加什么冲突,那将是一个很好的学习。

谢谢。

最新更新