Grok模式用于在Kibana仪表板中创建单独的部分



我一直在尝试使用logstash从我的自定义日志中提取和标记数据,但没有取得任何进展,我有一个自定义的haproxy日志,如下所示:

Feb 22 21:17:32 ap haproxy[1235]: 10.172.80.45:32071 10.31.33.34:44541 10.31.33.34:32772 13.127.229.72:443 [22/Feb/2020:21:17:32.006] this_machine~ backend_test-tui/test-tui_32772 40/0/5/1/836 200 701381 - - ---- 0/0/0/0/0 0/0 {testtui.net} {cache_hit} "GET /ob/720/output00007.ts HTTP/1.1"

我想从日志中提取并标记kibana仪表板中的特定内容,比如:

  • 从"40/0/5/1/836"部分,我想将最后一个部分数字(836(标记为"response_time">
  • "701381"作为"response_bytes">
  • "/ob/720/output0007.ts"作为"content_url">
  • 并且希望使用日志文件中的时间戳,而不是默认时间戳

我使用https://grokdebug.herokuapp.com/但每当我应用它时,我都会看到"_grokparsefailure"消息,并且kibana仪表板停止填充

下面是logstash调试日志

{
      "@version" => "1",
       "message" => "Mar  8 13:53:59 ap haproxy[22158]: 10.172.80.45:30835 10.31.33.34:57886 10.31.33.34:32771 43.252.91.147:443 [08/Mar/2020:13:53:59.827] this_machine~ backend_noida/noida_32771 55/0/1/0/145 200 2146931 - - ---- 0/0/0/0/0 0/0 {testalef1.adcontentamtsolutions.} {cache_hit} "GET /felaapp/virtual_videos/og/1080/output00006.ts HTTP/1.1"",
    "@timestamp" => 2020-03-08T10:24:07.348Z,
          "path" => "/home/alef/haproxy.log",
          "host" => "com1",
          "tags" => [
        [0] "_grokparsefailure"
    ]
}

下面是我创建的过滤器

%{MONTH:[Month]} %{MONTHDAY:[date]} %{TIME:[time]} %{WORD:[source]} %{WORD:[app]}[%{DATA:[class]}]: %{IPORHOST:[UE_IP]}:%{NUMBER:[UE_Port]} %{IPORHOST:[NATTED_IP]}:%{NUMBER:[NATTED_Source_Port]} %{IPORHOST:[NATTED_IP]}:%{NUMBER:[NATTED_Destination_Port]} %{IPORHOST:[WAN_IP]}:%{NUMBER:[WAN_Port]} [%{HAPROXYDATE:[accept_date]}] %{NOTSPACE:[frontend_name]}~ %{NOTSPACE:[backend_name]} %{NOTSPACE:[ty_name]}/%{NUMBER:[response_time]} %{NUMBER:[http_status_code]} %{INT:[response_bytes]} - - ---- %{NOTSPACE:[df]} %{NOTSPACE:[df]} %{DATA:[domain_name]} %{DATA:[cache_status]} %{DATA:[domain_name]} %{NOTSPACE:[content]} HTTP/%{NUMBER:[http_version]}

下面是我的logstash conf文件:

    input {
  beats {
    port => 5044
  }
 }
filter {
  grok {
    match => { "message" => "%{MONTH:[Month]} %{MONTHDAY:[date]} %{TIME:[time]} %{WORD:[source]} %{WORD:[app]}[%{DATA:[class]}]: %{IPORHOST:[UE_IP]}:%{NUMBER:[UE_Port]} %{IPORHOST:[NATTED_IP]}:%{NUMBER:[NATTED_Source_Port]} %{IPORHOST:[NATTED_IP]}:%{NUMBER:[NATTED_Destination_Port]} %{IPORHOST:[WAN_IP]}:%{NUMBER:[WAN_Port]} [%{HAPROXYDATE:[accept_date]}] %{NOTSPACE:[frontend_name]}~ %{NOTSPACE:[backend_name]} %{NOTSPACE:[ty_name]}/%{NUMBER:[response_time]} %{NUMBER:[http_status_code]} %{INT:[response_bytes]} - - ---- %{NOTSPACE:[df]} %{NOTSPACE:[df]} %{DATA:[domain_name]} %{DATA:[cache_status]} %{DATA:[domain_name]} %{NOTSPACE:[content]} HTTP/%{NUMBER:[http_version]} " }
  }
  date {
    match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
  }
 }
output {
  elasticsearch { hosts => ["localhost:9200"] }
  }

使用下面的过滤器解决了我的问题,必须在logstash本身中进行调试才能获得正确的过滤器:

输入{beats{端口=>5044}}

过滤器{match=>{"message"=>"%{MONTH:月}%{MONTHDAY:日期}%}TIME:时间}%{WORD:[source]}%{WORD:[app]}[%{DATA:[class]}]:%{IPROHOST:[UE_IP]}:%{NUMBER:[UE_Port]}%{IPROHOST:[NATED_IP]}:%{NUMBER:[NATED_Source_Port]}%{IPROHOST:[NATED_IP]}:%{NUMBER:[NATED_Destination_Port]}%{IPROHOST:[WAN_IP]}:%{NUMBER:[WAN_Port]}[%{HAPROXYDATE:[accept_date]}]%{NOTSPACE:[frontend_name]}~%{NOTSPACE:[backend_name]}%{NOTSPACE:[ty_name]}/%{NUMBER:[response_time]:int}%{NUMBER:[http_status_code]}%{NUMBER:[response_bytes]:int}-----%{NOTSPACE:[df]}%{NOTSPACE:[df]}%{DATA:[domain_name]}%{DATA:[cache_status]}%{DATA:[domain_name]}%}URIPATHPARAM:[content]}HTTP/%{NUMBER:[HTTP_version]}"}add_tag=>["response_time","response_time"]

}日期{match=>["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]}}

输出{elasticsearch{hosts=>["localhost:9200"]}

标准输出{

编解码器=>rubydebug

}}

最新更新