从kafka (MSK) AWS向elasticsearch发送数据



你好,我目前正在尝试通过confluent连接器从kafka(MSK)(AWS)发送数据到elasticsearch。我有一个来自sql数据库的数据流。当我运行confluent连接器时,它运行了一段时间,向elasticsearch发送数据,但在一分钟或两分钟后停止,错误如下。

sending LeaveGroup request to coordinator b-2.*****.amazonaws.com:9092 due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

有很多数据流,想知道是否有更好的方法或特定的设置一次发送多个批次或修复此错误的方法可能会有所帮助。我的elasticsearch服务器太小了吗?

如有其他信息,请告诉我,谢谢。

正如错误消息所暗示的那样,似乎向Elasticsearch(我假设这是一个接收器连接器)写入消息花费了太多时间,因此—消费者客户端(在这种情况下是连接器)由于超时而被迫离开消费者组。要隔离这个问题,您应该增加属性max.poll.interval.ms的值,看看问题是否仍然存在。如果是这样,这可能意味着在Elasticsearch中索引花费的时间太长,这可能表明你的Elasticsearch集群太忙或太小。

  • 或者,您可以在Elasticsearch上验证indices.indexing.index_time_in_millis度量,以检查索引时间是否确实花费太长。

检查Kafka Connect集群的健康状况也很重要。如果由于某种原因它不稳定,来自Elasticsearch连接器的任务将被重新平衡,这也会迫使消费者客户端离开消费者组。

这是一个消除选项的游戏,直到你找到问题的根本原因_()_/¯

最新更新