我正试图通过logstash将日志文件解析为弹性搜索。我的日志文件有不同的事件,我想合并在一起(每个事件都有不同的模式,但有一些唯一的ID(例如
- 时间戳,日志级别,唯一id,一些开始消息,用户名
- 时间戳、日志级别、唯一id、某些文件
- 时间戳、日志级别、唯一id、某些计数
- 时间戳、日志级别、唯一id、作业占用的时间
我想把这些事件合并成一个这样我就可以理解一个作业所花费的时间、计数和其他事情就像我的结果应该包含像这样的字段
时间已开始对数电平开始消息用户名某些文件计数作业占用的时间
这就是我想要在一些唯一ID的帮助下将不同的事件合并并存储在日志中的方式,这些ID对于事件来说是相同或唯一的
问题是我无法理解我们如何实现这一点,我如何将这种不同的模式日志合并到一个文档中
任何帮助都是值得的谢谢
您可以使用聚合过滤器来组合它们。您可以使用dissect或grok来提取字段。我会从开始
dissect { mapping => { "message" => "%{[@metadata][ts]}", %{loglevel}, %{id}, %{[@metadata][restOfLine]} " } }
然后使用日期过滤器将[@metadata][ts]解析为[@timestamp]。接下来,使用grok来拆分[@metadata][restOfLine]字段。在聚合过滤器中,您将使用与文档中的示例3类似的内容。使用push_map_as_event_on_timeout选项时,只有添加到映射中的字段才是事件的一部分。因此,当你看到每一行时,你会在地图上添加字段。类似的东西
aggregate {
task_id => "%{id}"
code => '
map["id"] ||= event.get("id")
map["@timestamp"] ||= event.get("@timestamp")
map["loglevel"] ||= event.get("loglevel")
map["username"] ||= event.get("username")
map["timetaken"] ||= event.get("timetaken")
# etc. etc.
event.cancel # This delete the event that items were grok'd from.
'
push_map_as_event_on_timeout => true
timeout => 10
}
注意聚合文档中的限制,如pipeline.workers 1和pipeline.ordered.的正确设置