如何在logstash中编写grok模式



我正在尝试从logstash开始,而我的应用程序具有以下类型的日志。这里有5个指示将遵循5行,这是针对不同相关事物收集的统计数据。

这些基本上是应用统计数据,每行指示其中一个资源之一。

是否有一种方法可以使用logstash正确解析它,以便可以用于弹性搜索?

[20170502 01:57:26.209 EDT (thread-name) package-name.classname#MethodName INFO] Some info line (5 stats):
[fieldA: strvalue1| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue2| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue3| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue4| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]
[fieldA: strvalue5| field2: 0 | field3: 0 | field4: 0 | field5: 0 | field6: 0 | field7: 0]

编辑

这是我正在使用的配置,因此,第一组统计数据正在正确解析,但是在该管道被卡住之后。请注意150个这样的日志,但如果我只保持2-3,则可以正常工作。您能帮我在这里确定问题吗?

# [20170513 06:08:29.734 EDT (StatsCollector-1) deshaw.tools.jms.ActiveMQLoggingPlugin$ActiveMQDestinationStatsCollector#logPerDestinationStats INFO] ActiveMQ Destination Stats (97 destinations):
# [destName: topic://darts.metaDataChangeTopic | enqueueCount: 1 | dequeueCount: 1 | dispatchCount: 1 | expiredCount: 0 | inflightCount: 0 | msgsHeld: 0 | msgsCached: 0 | memoryPercentUsage: 0 | memoryUsage: 0 | memoryLimit: 536870912 | avgEnqueueTimeMs: 0.0 | maxEnqueueTimeMs: 0 | minEnqueueTimeMs: 0 | currentConsumers: 1 | currentProducers: 0 | blockedSendsCount: 0 | blockedSendsTimeMs: 0 | minMsgSize: 2392 | maxMsgSize: 2392 | avgMsgSize: 2392.0 | totalMsgSize: 2392]
input {
  file {
    path => "/u/bansalp/activemq_primary_plugin.stats.log.1"
### For testing and continual process of the same file, remove these before produciton
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^[destName:"
      negate => false
      what => "previous"
    }
  }
}
filter {
    if ([message] =~ /^s*$/ ){
        drop{}
    }
    if ([message] =~ /^[^[]/) {
            drop{}
    }
    if ([message] =~ /logMemoryInfo|logProcessInfo|logSystemInfo|logThreadBreakdown|logBrokerStats/) {
            drop{}
    }
    if [message] =~ "logPerDestinationStats" {
        grok {
                match => { "message" => "^[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}s*%{TIME:time}s*%{TZ:timezone}s*(%{DATA:thread_name})s*%{JAVACLASS:javaclass}#%{WORD:method}s*%{LOGLEVEL}]s*"
                }
        }
        split { 
            field => "message"
        }
        grok {
                match => { "message" => "^[%{DATA}:s*%{DATA:destName}s*|s*%{DATA}:s*%{NUMBER:enqueueCount}s*|s*%{DATA}:s*%{NUMBER:dequeueCount}s*|s*%{DATA}:s*%{NUMBER:dispatchCount}s*|s*%{DATA}:s*%{NUMBER:expiredCount}s*|s*%{DATA}:s*%{NUMBER:inflightCount}s*|s*%{DATA}:s*%{NUMBER:msgsHeld}s*|s*%{DATA}:s*%{NUMBER:msgsCached}s*|s*%{DATA}:s*%{NUMBER:memoryPercentUsage}s*|s*%{DATA}:s*%{NUMBER:memoryUsage}s*|s*%{DATA}:s*%{NUMBER:memoryLimit}s*|s*%{DATA}:s*%{NUMBER:avgEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:maxEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:minEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:currentConsumers}s*|s*%{DATA}:s*%{NUMBER:currentProducers}s*|s*%{DATA}:s*%{NUMBER:blockedSendsCount}s*|s*%{DATA}:s*%{NUMBER:blockedSendsTimeMs}s*|s*%{DATA}:s*%{NUMBER:minMsgSize}s*|s*%{DATA}:s*%{NUMBER:maxMsgSize}s*|s*%{DATA}:s*%{NUMBER:avgMsgSize}s*|s*%{DATA}:s*%{NUMBER:totalMsgSize}]$" }
        }
        mutate {
            convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
}
output {
  stdout {codec => rubydebug}
}

当然是。

您需要做的是在输入过滤器上使用多行解码器。

按照示例:

input {
  file {
    path => "/var/log/someapp.log"
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^[%{YEAR}%{MONTHNUM}%{MONTHDAY}s*%{TIME}"
      negate => true
      what => previous
    }
  }
}

这基本上指出,任何不以yyyymmdd HH的开头的行

从那里您可以将Grok模式应用于第一行(获取高级数据(。

一旦您感到高兴,就可以从第一行中获得所有所需的数据,然后可以在 r或 n上拆分,并使用单个grok模式获取单个统计数据(根据您在上面给出的示例(。

希望这会有所帮助

d

更新2017-05-08 11:54:

完整的logstash conf可能看起来像这样,您需要考虑更改grok模式以更好地适合您的要求(只有您知道数据(。

请注意,这尚未进行测试,我将其保留给您。

input {
  file {
    path => "/var/log/someapp.log"
### For testing and continual process of the same file, remove these before produciton
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^[%{YEAR}%{MONTHNUM}%{MONTHDAY}s*%{TIME}"
      negate => true
      what => previous
    }
  }
}
filter {
### Let's get some high level data before we split the line (note: anything you grab before the split gets copied)
    grok {
        match => { "message" => "^[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}s*%{TIME:time}s*%{TZ:timezone}s*(%{DATA:thread_name})s*%{JAVACLASS:javaclass}#%{WORD:method}s*%{LOGLEVEL}]"
        }
    }
### Split the lines back out to being a single line now. (this may be a r or n, test which one)
    split { 
        "field" => "message"
        "terminator" => "r" 
    }
### Ok, the lines should now be independent, lets add another grok here to get the patterns as dictated by your example [fieldA: str | field2: 0...] etc.
### Note: you should look to change the grok pattern to better suit your requirements, I used DATA here to quickly capture your content
    grok {
        break_on_match => false
        match => { "message" => "^[%{DATA}:s*%{DATA:fieldA}|%{DATA}:s*%{DATA:field2}|%{DATA}:s*%{DATA:field3}|%{DATA}:s*%{DATA:field4}|%{DATA}:s*%{DATA:field5}|%{DATA}:s*%{DATA:field6}|%{DATA}:s*%{DATA:field7}]$" }
    }
    mutate {
    convert => { "message" => "string" }
        add_field => {
            "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
            "load_timestamp" => "%{@timestamp}"
        }
        remove_field => ["yr","mnt", "daynum", "time", "timezone"]
    }
}
output {
  stdout { codec => rubydebug }
}

编辑2017-05-15

logstash是一个复杂的解析器,它希望它作为一个过程熬夜,并不断监视日志文件(因此,为什么必须将其崩溃(

匹配中的突破意味着您可以对同一行有多个匹配要求,如果它找不到匹配项,它将尝试下一个列表中的下一个(始终复杂到简单(

您的输入过滤器,以.log*的方式更改通往最终的路径,此外,按照您的原始示例,该模式不必与所需的日期格式匹配(以将所有关联带到一行(

您的过滤器应指定我相信的拆分字符是什么(否则我相信默认值是逗号(。

input {
  file {
    path => "/u/bansalp/activemq_primary_plugin.stats.log*"
### For testing and continual process of the same file, remove these before production
    start_position => "beginning"
    sincedb_path => "/dev/null"
### Lets read the logfile and recombine multi line details
    codec => multiline {
      # Grok pattern names are valid! :)
      pattern => "^[destName:"
      negate => false
      what => "previous"
    }
  }
}
filter {
    if "logPerDestinationStats" in [message] {
        grok {
                match => { "message" => "^[%{YEAR:yr}%{MONTHNUM:mnt}%{MONTHDAY:daynum}s*%{TIME:time}s*%{TZ:timezone}s*(%{DATA:thread_name})s*%{JAVACLASS:javaclass}#%{WORD:method}s*%{LOGLEVEL}]s*"
                }
        }
        split { 
            field => "message"
            terminator => "r”
            }
        grok {
                match => { "message" => "^[%{DATA}:s*%{DATA:destName}s*|s*%{DATA}:s*%{NUMBER:enqueueCount}s*|s*%{DATA}:s*%{NUMBER:dequeueCount}s*|s*%{DATA}:s*%{NUMBER:dispatchCount}s*|s*%{DATA}:s*%{NUMBER:expiredCount}s*|s*%{DATA}:s*%{NUMBER:inflightCount}s*|s*%{DATA}:s*%{NUMBER:msgsHeld}s*|s*%{DATA}:s*%{NUMBER:msgsCached}s*|s*%{DATA}:s*%{NUMBER:memoryPercentUsage}s*|s*%{DATA}:s*%{NUMBER:memoryUsage}s*|s*%{DATA}:s*%{NUMBER:memoryLimit}s*|s*%{DATA}:s*%{NUMBER:avgEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:maxEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:minEnqueueTimeMs}s*|s*%{DATA}:s*%{NUMBER:currentConsumers}s*|s*%{DATA}:s*%{NUMBER:currentProducers}s*|s*%{DATA}:s*%{NUMBER:blockedSendsCount}s*|s*%{DATA}:s*%{NUMBER:blockedSendsTimeMs}s*|s*%{DATA}:s*%{NUMBER:minMsgSize}s*|s*%{DATA}:s*%{NUMBER:maxMsgSize}s*|s*%{DATA}:s*%{NUMBER:avgMsgSize}s*|s*%{DATA}:s*%{NUMBER:totalMsgSize}]$" }
        }
        mutate {
            convert => { "message" => "string" }
            add_field => {
                "session_timestamp" => "%{yr}-%{mnt}-%{daynum} %{time} %{timezone}"
                "load_timestamp" => "%{@timestamp}"
            }
            remove_field => ["yr","mnt", "daynum", "time", "timezone"]
        }
    }
   else {
      drop{}
    }
}

请原谅我目前正在从手机上更新此版本的格式,我很高兴有人会更新格式。

相关内容

  • 没有找到相关文章

最新更新