我将ELK与filebeat一起使用。我将日志从filebeat发送到Logstash,然后从那里发送到Elastic,并在Kibana中可视化。我正在粘贴kibana的日志结果中显示的json结果,如下所示:
{
"_index": "filebeat-6.4.2-2018.10.30",
"_type": "doc",
"_source": {
"@timestamp": "2018-10-30T09:15:31.697Z",
"fields": {
"server": "server1"
},
"prospector": {
"type": "log"
},
"host": {
"name": "kushmathapa"
},
"message": "{ "datetime": "2018-10-23T18:04:00.811660Z", "level": "ERROR", "message": "No response from remote. Handshake timed out or transport failure detector triggered." }",
"source": "C:\logs\batch-portal\error.json",
"input": {
"type": "log"
},
"beat": {
"name": "kushmathapa",
"hostname": "kushmathapa",
"version": "6.4.2"
},
"offset": 0,
"tags": [
"lighthouse1",
"controller",
"trt"
]
},
"fields": {
"@timestamp": [
"2018-10-30T09:15:31.697Z"
]
}
}
我希望这个显示为
{
"_index": "filebeat-6.4.2-2018.10.30",
"_type": "doc",
"_source": {
"@timestamp": "2018-10-30T09:15:31.697Z",
"fields": {
"server": "server1"
},
"prospector": {
"type": "log"
},
"host": {
"name": "kushmathapa"
},
"datetime": 2018-10-23T18:04:00.811660Z,
"log_level": ERROR,
"message": "{ "No response from remote. Handshake timed out or transport failure detector triggered." }",
"source": "C:\logs\batch-portal\error.json",
"input": {
"type": "log"
},
"beat": {
"name": "kushmathapa",
"hostname": "kushmathapa",
"version": "6.4.2"
},
"offset": 0,
"tags": [
"lighthouse1",
"controller",
"trt"
]
},
"fields": {
"@timestamp": [
"2018-10-30T09:15:31.697Z"
]
}
}
我的beats.config现在看起来像这样
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
} stdout {
codec => rubydebug { metadata => true }
}
}
我已经应用了过滤器,但我似乎缺少了一些东西。
您可以使用如下所示的配置文件。在grok过滤器中,将您想要获取的日志格式添加到弹性搜索中(例如,请参阅前面提到的配置(。
input {
beats {
port => 5044
id => "my_plugin_id"
tags => ["logs"]
type => "abc"
}
}
filter {
if [type] == "abc" {
mutate {
gsub => [ "message", "r", "" ]
}
grok {
break_on_match => true
match => {
"message" => [
"%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:message}"
]
}
overwrite => [ "message" ]
}
grok {
break_on_match => true
match => {
"message" => [
"%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:message}"
]
}
overwrite => [ "message" ]
}
date {
match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,SSS" ]
}
}
}
output {
if [type] == "abc" {
elasticsearch {
hosts => ["ip of elasticsearch:port_number of elasticsearch"]
index => "logfiles"
}
}
else {
elasticsearch {
hosts => ["ip of elasticsearch:port_number of elasticsearch"]
index => "task_log"
}
}
stdout {
codec => rubydebug { metadata => true }
}
}
Logstash需要知道您正在接收的message
字段是JSON格式的。你可以在这里使用json
过滤器,几乎可以开箱即用地获得你想要的所有东西:
filter {
json {
target => "message"
}
}
如果有必要,可以使用突变或添加/删除字段将level
重命名为log.level
,将datetime
重命名为@datetime
。