我一直在尝试使用logstash从我的自定义日志中提取和标记数据,但没有取得任何进展,我有一个自定义的haproxy日志,如下所示:
Feb 22 21:17:32 ap haproxy[1235]: 10.172.80.45:32071 10.31.33.34:44541 10.31.33.34:32772 13.127.229.72:443 [22/Feb/2020:21:17:32.006] this_machine~ backend_test-tui/test-tui_32772 40/0/5/1/836 200 701381 - - ---- 0/0/0/0/0 0/0 {testtui.net} {cache_hit} "GET /ob/720/output00007.ts HTTP/1.1"
我想从日志中提取并标记kibana仪表板中的特定内容,比如:
- 从"40/0/5/1/836"部分,我想将最后一个部分数字(836(标记为"response_time">
- "701381"作为"response_bytes">
- "/ob/720/output0007.ts"作为"content_url">
- 并且希望使用日志文件中的时间戳,而不是默认时间戳
我使用https://grokdebug.herokuapp.com/但每当我应用它时,我都会看到"_grokparsefailure"消息,并且kibana仪表板停止填充
下面是logstash调试日志
{
"@version" => "1",
"message" => "Mar 8 13:53:59 ap haproxy[22158]: 10.172.80.45:30835 10.31.33.34:57886 10.31.33.34:32771 43.252.91.147:443 [08/Mar/2020:13:53:59.827] this_machine~ backend_noida/noida_32771 55/0/1/0/145 200 2146931 - - ---- 0/0/0/0/0 0/0 {testalef1.adcontentamtsolutions.} {cache_hit} "GET /felaapp/virtual_videos/og/1080/output00006.ts HTTP/1.1"",
"@timestamp" => 2020-03-08T10:24:07.348Z,
"path" => "/home/alef/haproxy.log",
"host" => "com1",
"tags" => [
[0] "_grokparsefailure"
]
}
下面是我创建的过滤器
%{MONTH:[Month]} %{MONTHDAY:[date]} %{TIME:[time]} %{WORD:[source]} %{WORD:[app]}[%{DATA:[class]}]: %{IPORHOST:[UE_IP]}:%{NUMBER:[UE_Port]} %{IPORHOST:[NATTED_IP]}:%{NUMBER:[NATTED_Source_Port]} %{IPORHOST:[NATTED_IP]}:%{NUMBER:[NATTED_Destination_Port]} %{IPORHOST:[WAN_IP]}:%{NUMBER:[WAN_Port]} [%{HAPROXYDATE:[accept_date]}] %{NOTSPACE:[frontend_name]}~ %{NOTSPACE:[backend_name]} %{NOTSPACE:[ty_name]}/%{NUMBER:[response_time]} %{NUMBER:[http_status_code]} %{INT:[response_bytes]} - - ---- %{NOTSPACE:[df]} %{NOTSPACE:[df]} %{DATA:[domain_name]} %{DATA:[cache_status]} %{DATA:[domain_name]} %{NOTSPACE:[content]} HTTP/%{NUMBER:[http_version]}
下面是我的logstash conf文件:
input {
beats {
port => 5044
}
}
filter {
grok {
match => { "message" => "%{MONTH:[Month]} %{MONTHDAY:[date]} %{TIME:[time]} %{WORD:[source]} %{WORD:[app]}[%{DATA:[class]}]: %{IPORHOST:[UE_IP]}:%{NUMBER:[UE_Port]} %{IPORHOST:[NATTED_IP]}:%{NUMBER:[NATTED_Source_Port]} %{IPORHOST:[NATTED_IP]}:%{NUMBER:[NATTED_Destination_Port]} %{IPORHOST:[WAN_IP]}:%{NUMBER:[WAN_Port]} [%{HAPROXYDATE:[accept_date]}] %{NOTSPACE:[frontend_name]}~ %{NOTSPACE:[backend_name]} %{NOTSPACE:[ty_name]}/%{NUMBER:[response_time]} %{NUMBER:[http_status_code]} %{INT:[response_bytes]} - - ---- %{NOTSPACE:[df]} %{NOTSPACE:[df]} %{DATA:[domain_name]} %{DATA:[cache_status]} %{DATA:[domain_name]} %{NOTSPACE:[content]} HTTP/%{NUMBER:[http_version]} " }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}
output {
elasticsearch { hosts => ["localhost:9200"] }
}
使用下面的过滤器解决了我的问题,必须在logstash本身中进行调试才能获得正确的过滤器:
输入{beats{端口=>5044}}
过滤器{match=>{"message"=>"%{MONTH:月}%{MONTHDAY:日期}%}TIME:时间}%{WORD:[source]}%{WORD:[app]}[%{DATA:[class]}]:%{IPROHOST:[UE_IP]}:%{NUMBER:[UE_Port]}%{IPROHOST:[NATED_IP]}:%{NUMBER:[NATED_Source_Port]}%{IPROHOST:[NATED_IP]}:%{NUMBER:[NATED_Destination_Port]}%{IPROHOST:[WAN_IP]}:%{NUMBER:[WAN_Port]}[%{HAPROXYDATE:[accept_date]}]%{NOTSPACE:[frontend_name]}~%{NOTSPACE:[backend_name]}%{NOTSPACE:[ty_name]}/%{NUMBER:[response_time]:int}%{NUMBER:[http_status_code]}%{NUMBER:[response_bytes]:int}-----%{NOTSPACE:[df]}%{NOTSPACE:[df]}%{DATA:[domain_name]}%{DATA:[cache_status]}%{DATA:[domain_name]}%}URIPATHPARAM:[content]}HTTP/%{NUMBER:[HTTP_version]}"}add_tag=>["response_time","response_time"]
}日期{match=>["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]}}
输出{elasticsearch{hosts=>["localhost:9200"]}
标准输出{
编解码器=>rubydebug
}}