"Failed to allocate memory" 并在 Kafka 代理关闭 5+ 分钟后"Expiring record"错误,然后重新加入集群



我们正在对AWS EC2中的Kafka 1.1经纪人进行一些测试。具体来说,我们会干净地关闭经纪人5分钟或更长时间,然后重新启动它们,同时始终从集群中产生和消费。从理论上讲,这对生产者和消费者都相对无缝。

但是,在测试经纪人停止5分钟时,我们会在生产者应用程序上遇到这些错误:

 ERROR - 2019-02-27 04:34:24.946 - message size [2494] 
       -Expiring 7 record(s) for topic2-0: 30033 ms has passed since last append

,然后是与此类似的许多错误中的第一个。

 ERROR - 2019-02-27 04:35:13.098; Topic [topic2], message size [2494] 
       -Failed to allocate memory within the configured max blocking time 60000 ms. 

那时,长期运行的生产者变得无反应,每个生产者的请求都以相同的Failed to allocate memory失败。我试图在线搜索类似的问题,但我所能找到的只是一个旧的kafka jira票,该票在0.10.1.1中解决,因此它不应适用我们使用的更新的1.1版本。

https://issues.apache.org/jira/browse/kafka-3651

我们尝试了许多不同的方案,包括将生产者的配置更改回KAFKA默认值或更多。

最终弄清楚了如何修复此问题。在这里发布它,以防其他人有用。

我们的producer.send呼叫的回调定义,如果发生错误,将尝试使用同一生产者将消息重新归档到另一个主题。该机制已经建立,因此,如果发送呼叫导致错误,我们至少将存储在错误主题中的消息,我们可以在其中检查数据并弄清楚发生了什么。

这种机制通常运行良好,但是每当我们暂时失去经纪人(必须将其放下以进行维护/升级/等)时,我们所有的应用程序都会被ELB卡住并最终被ELB杀死。

来自KafkaProducer.send()的官方Javadoc,我找到了:

请注意,回调通常会在该线程的I/O线程中执行 生产者,因此应该很快迅速,否则他们会延迟 从其他线程发送消息。如果要执行 阻止或计算昂贵的回调,建议 在回调主体中使用自己的遗嘱执行人并行化处理。

https://kafka.apache.org/11/javadoc/index.html?org/apache/kafka/kafka/clients/producer/kafkaproducer.html

回调内的producer.send有资格作为一个昂贵的电话,恕我直言。在进行了一些进一步的调试和分析之后,事实证明,所有XNIO线程都被卡住了,而我想从经纪人重新加入集群时,回调producer.send()呼叫正在等待Stale Metadata。这最终导致申请停止响应。

我们案件中的修复是简单地在ConcurrentLinkedDeque中加入错误消息,并通过单独的线程处理这些消息,其唯一的工作是从Deque中获得take消息并尝试重新播放它们。p>将此更改部署到我们所有的环境后,应用程序在多个经纪重新启动和中断过程中表现得很好。

相关内容

最新更新