使用Elasticsearch输出的LogStash:如何写入不同的索引



我希望在这里找到一个自昨天以来一直在挣扎的问题的答案:

我正在使用RabbitMQ输入和Elasticsearch输出配置Logstash 1.5.6。

消息以批量格式发表,我的logstash消耗它们,并将它们全部写入ElasticSearch默认索引index logstash-yyy.mm.dd,并带有此配置:

input {
  rabbitmq {
  host => 'xxx'
  user => 'xxx'
  password => 'xxx'
  queue => 'xxx'
  exchange => "xxx"
  key => 'xxx'
  durable => true
}
output {
  elasticsearch {
  host => "xxx"
  cluster => "elasticsearch"
  flush_size =>10
  bind_port => 9300
  codec => "json"
  protocol => "http"
  }
stdout { codec => rubydebug }
}

现在我要做的就是将消息发送到不同的Elasticsearch索引。

来自AMQP输入的消息已经具有索引和键入参数(批量格式)。

因此,阅读文档后:https://www.elastic.co/guide/en/logstash/1.5/event-depperent-configuration.html#logstash-config-field-field-field-references

我尝试这样做

input {
  rabbitmq {
  host => 'xxx'
  user => 'xxx'
  password => 'xxx'
  queue => 'xxx'
  exchange => "xxx"
  key => 'xxx'
  durable => true
}
output {
  elasticsearch {
  host => "xxx"
  cluster => "elasticsearch"
  flush_size =>10
  bind_port => 9300
  codec => "json"
  protocol => "http"
  index => "%{[index][_index]}"
  }
stdout { codec => rubydebug }
}

但是,Logstash正在做的是创建索引%{[index] [_ index]},然后放在所有文档中而不是读取_index参数并将文档发送到其中!

我也尝试了以下内容:

index => %{index}
index => '%{index}'
index => "%{index}"

,但似乎没有任何作用。

有帮助吗?

恢复,这里的主要问题是:如果兔子消息具有此格式:

{"index":{"_index":"indexA","_type":"typeX","_ttl":2592000000}}
{"@timestamp":"2017-03-09T15:55:54.520Z","@version":"1","@fields":{DATA}}

如何告诉logstash,在名为" Indexa"的索引中以" typex" ??

发送输出。

如果您在RabbitMQ中的消息已经以批量格式,那么您不需要使用elasticsearch输出,但是简单的http输出击中_bulk端点将有能力:

output {
    http {
        http_method => "post"
        url => "http://localhost:9200/_bulk"
        format => "message"
        message => "%{message}"
    }
}

所以,在val的帮助下,解决方案是:

  • 正如他所说的那样
  • 所以我用以下方式替换了输出:

    output {
       http {
       http_method => "post"
       url => "http://172.16.1.81:9200/_bulk"
       format => "message"
       message => "%{message}" 
    }
    stdout { codec => json_lines }
    }
    
  • ,但它仍然无法正常工作。我正在使用logstash 1.5.6,然后升级到logstash 2.0.0(https://www.elastic.co/guide/guide/en/logstash/2.4/_upgrading_using_using_package_managers.html),它可以使用相同的配置。

那里是:)

如果将json消息存储在兔子中,则可以解决此问题。在JSON消息中使用索引和键入为字段,并将这些值分配给Elasticsearch输出插件。

index => "%{指数}" //从json主体收到的索引,从kafka生产者document_type =>"%{type}"}//键入JSON主体

使用这种方法,每条消息都可以具有自己的索引和类型。

最新更新