我有一个日志文件,其中包含以时间戳开头的行。每个这样的时间戳行后面可能会有不确定数量的额外行:
SOMETIMESTAMP some data
extra line 1 2
extra line 3 4
额外的行将为带时间戳的行提供补充信息。我想提取 1、2、3 和 4 并将它们保存为变量。如果我知道有多少行,我可以将多余的行解析为变量。例如,如果我知道还有两行额外的行,下面的 grok 过滤器将起作用。但是,如果事先不知道会有多少条额外的线路,该怎么办?在应用多行过滤器之前,有没有办法逐一解析这些行?这可能会有所帮助。
另外,即使我知道我只会有 2 行额外的行,下面的过滤器是访问它们的最佳方式吗?
filter {
multiline {
pattern => "^%{SOMETIMESTAMP}"
negate => "true"
what => "previous"
}
if "multiline" in [tags] {
grok {
match => { "message" => "(?m)^%{SOMETIMESTAMP} %{DATA:firstline}(?<newline>[rn]+)%{DATA:secondline}(?<newline>[rn]+)%{DATA:thirdline}$" }
}
}
# After this would be grok filters to process the contents of
# 'firstline', 'secondline', and 'thirdline'. I would then remove
# these three temporary fields from the final output.
}
(我将行分成单独的变量,因为这允许我单独对行的内容进行额外的模式匹配,而不必再次引用整个模式。例如,根据第一行的内容,我可能希望呈现其他行的分支行为。
你为什么需要这个?
您是要插入一个包含所有值的事件,还是它们真的需要共享相同时间戳的独立事件?
如果它们都需要出现在同一个事件中,则需要求助于ruby
过滤器,将多余的行分离到事件上的字段中,然后您可以进一步处理。
例如:
if "multiline" in [tags] {
grok {
match => { "message" => "(?m)^%{SOMETIMESTAMP} %{DATA:firstline}(?<newline>[rn]+)" }
}
ruby {
code => '
event["lines"] = event["message"].scan(/[^rn]+[rn]*/);
'
}
}
如果它们确实是单独的事件,您可以使用 logstash 1.5 及更高版本的 memorize 插件。
这在 ELK 版本中发生了变化直接事件字段引用(即event['field'])已被禁用,以支持使用事件获取和设置方法(例如event.get('field'))。
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:logtime} %{LOGLEVEL:level}%{DATA:firstline}" }
}
ruby { code => "event.set('message', event.get('message').scan(/[^rn]+[rn]*/))" }
}