从 Lostash 的 beats 中的 filebeat 自定义日志.config



我将ELK与filebeat一起使用。我将日志从filebeat发送到Logstash,然后从那里发送到Elastic,并在Kibana中可视化。我正在粘贴kibana的日志结果中显示的json结果,如下所示:

{
"_index": "filebeat-6.4.2-2018.10.30",
"_type": "doc",
"_source": {
"@timestamp": "2018-10-30T09:15:31.697Z",
"fields": {
"server": "server1"
},
"prospector": {
"type": "log"
},
"host": {
"name": "kushmathapa"
},
"message": "{ "datetime": "2018-10-23T18:04:00.811660Z", "level": "ERROR", "message": "No response from remote. Handshake timed out or transport failure detector triggered." }",
"source": "C:\logs\batch-portal\error.json",
"input": {
"type": "log"
},
"beat": {
"name": "kushmathapa",
"hostname": "kushmathapa",
"version": "6.4.2"
},
"offset": 0,
"tags": [
"lighthouse1",
"controller",
"trt"
]
},
"fields": {
"@timestamp": [
"2018-10-30T09:15:31.697Z"
]
}
}

我希望这个显示为

{
"_index": "filebeat-6.4.2-2018.10.30",
"_type": "doc",
"_source": {
"@timestamp": "2018-10-30T09:15:31.697Z",
"fields": {
"server": "server1"
},
"prospector": {
"type": "log"
},
"host": {
"name": "kushmathapa"
},
"datetime": 2018-10-23T18:04:00.811660Z,
"log_level": ERROR,
"message": "{ "No response from remote. Handshake timed out or transport failure detector triggered." }",
"source": "C:\logs\batch-portal\error.json",
"input": {
"type": "log"
},
"beat": {
"name": "kushmathapa",
"hostname": "kushmathapa",
"version": "6.4.2"
},
"offset": 0,
"tags": [
"lighthouse1",
"controller",
"trt"
]
},
"fields": {
"@timestamp": [
"2018-10-30T09:15:31.697Z"
]
}
}

我的beats.config现在看起来像这样

input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => "localhost:9200"
manage_template => false
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" 
} stdout {
codec => rubydebug { metadata => true }
}
}

我已经应用了过滤器,但我似乎缺少了一些东西。

您可以使用如下所示的配置文件。在grok过滤器中,将您想要获取的日志格式添加到弹性搜索中(例如,请参阅前面提到的配置(。

input {
beats {
port => 5044
id => "my_plugin_id"
tags => ["logs"]
type => "abc"
}
}
filter {
if [type] == "abc" {
mutate {
gsub => [ "message", "r", "" ]
}
grok {
break_on_match => true
match => {
"message" => [
"%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:message}"
]
}
overwrite => [ "message" ]
}
grok {
break_on_match => true
match => {
"message" => [
"%{TIMESTAMP_ISO8601:timestamp}%{SPACE}%{LOGLEVEL:log_level}%{SPACE}%{GREEDYDATA:message}"
]
}
overwrite => [ "message" ]
}
date {
match => [ "timestamp" , "yyyy-MM-dd HH:mm:ss,SSS" ]
} 
}
}
output {
if [type] == "abc" {
elasticsearch { 
hosts => ["ip of elasticsearch:port_number of elasticsearch"]
index => "logfiles"
} 
}
else {
elasticsearch { 
hosts => ["ip of elasticsearch:port_number of elasticsearch"]
index => "task_log"
} 
}
stdout {
codec => rubydebug { metadata => true }
}
}

Logstash需要知道您正在接收的message字段是JSON格式的。你可以在这里使用json过滤器,几乎可以开箱即用地获得你想要的所有东西:

filter {
json {
target => "message"
}
}

如果有必要,可以使用突变或添加/删除字段将level重命名为log.level,将datetime重命名为@datetime

相关内容

  • 没有找到相关文章

最新更新