我有以下集群:
Kafka -> 一些日志收集器 -> Elasticsearch
我的问题是选择最有效的日志收集器(或其他一些软件,它允许管理Kafka和ElasticSearch之间的数据流(。
我正在尝试从Logstash,Fluentd和Confluent的Kafka Elasticsearch连接器中进行选择。 我面临的主要问题是在写入 Elasticsearch 端点时出现问题后无法在 Kafka 中回滚偏移量。
例如,logstash doc 说"如果启用,400 和 404错误将发送到死信队列 (DLQ(。如果未启用 DLQ,将发出日志消息,并将丢弃事件"(https://www.elastic.co/guide/en/logstash/6.x/plugins-outputs-elasticsearch.html#_retry_policy(。如果我有这样的错误,logstash 将继续从 Kafka 读取数据。错误会一次又一次地发生。虽然,我的所有数据都将存储在DLQ中,但当第一个错误发生时,Kafka的偏移量将远离该位置。我必须手动定义正确的偏移量。
所以,我的问题是: 是否有适用于 Kafka 和 ElasticSearch 的连接器,它允许在收到来自 ElasticSearch (400/404( 的第一个错误后停止移动偏移量?
提前谢谢。
我认为问题不在于效率,而在于可靠性
我面临的主要问题是在写入 Elasticsearch 端点时出现问题后无法在 Kafka 中回滚偏移量。
我对 Connect 或 Logstash 的 DLQ 功能没有太多经验,但重置消费组偏移量并非不可能。但是,如果使用者应用程序正确处理偏移提交,则不需要这样做。
如果 Connect 向 ES 引发连接错误,它将重试,而不是提交偏移量。
如果错误不可恢复,则 Connect 将停止使用,并且再次不提交偏移。
因此,从消息批处理中获取丢失数据的唯一方法是,如果该批处理最终出现在DLQ中,则使用任何框架。
如果禁用 DLQ,则丢失数据的唯一方法是从 Kafka 过期