根据前面的讨论(在Logstash中定义多个输出,同时处理Elasticsearch实例的潜在不可用性),我现在在Logstash中使用管道,以便将数据输入(从TCP 5044上的Beats)发送到多个Elasticsearch主机。pipelines.yml
的相关提取物如下图所示。
- pipeline.id: beats
queue.type: persisted
config.string: |
input {
beats {
port => 5044
ssl => true
ssl_certificate_authorities => '/etc/logstash/config/certs/ca.crt'
ssl_key => '/etc/logstash/config/certs/forwarder-001.pkcs8.key'
ssl_certificate => '/etc/logstash/config/certs/forwarder-001.crt'
ssl_verify_mode => "force_peer"
}
}
output { pipeline { send_to => [es100, es101] } }
- pipeline.id: es100
path.config: "/etc/logstash/pipelines/es100.conf"
- pipeline.id: es101
path.config: "/etc/logstash/pipelines/es101.conf"
在每个管道.conf
文件中,我有相关的虚拟地址,即文件/etc/logstash/pipelines/es101.conf
包括以下内容:
input {
pipeline {
address => es101
}
}
这个配置似乎工作得很好,即数据被每个Elasticsearch主机es100
和es101
接收。
我需要确保如果其中一个主机不可用,另一个主机仍然可以接收数据,并且由于之前的技巧,我现在使用的管道我理解允许这一点。然而,我显然在这个配置中缺少一些关键的东西,因为如果另一个主机不可用,则数据不会被主机接收。欢迎提出任何建议。
首先,您应该在下游管道(es100、es101)上配置持久队列,并调整它们的大小,以包含在中断期间到达的所有数据。但是,即使使用持久队列,logstash也具有至少一次的交付模型。如果持久队列满了,那么反压将导致节拍输入停止接受数据。正如关于输出隔离器模式的文档所述"如果任何下游管道的持久队列…变为full时,两个输出都将停止。如果您真的想要确保一个输出不会因为另一个输出不可用而阻塞,那么您将需要引入一些具有不同交付模型的软件。例如,配置filebeat写入kafka,然后有两个管道从kafka读取和写入elasticsearch。如果kafka配置了最多一次的交付模型(默认),那么如果它不能交付数据,它将丢失数据。