Logstash-按关键字筛选系统日志



我是ELK的新手,我有logstash存储从多个网络设备生成的系统日志。所有类型的事件都标记在一个设施上(不幸的是(,我只对使用DHCP租约存储日志感兴趣。下面是logstash 存储的日志示例

host:10.10.2.21 tags:_grokparsefailure type:dhcp @timestamp:May 6, 2020 @ 22:50:35.176 message:<190>May 6 22:50:33 10.7.24.13 Kiwi_Syslog_Server 2020-05-07 02:50:35 MOPS-E200E-CE-1 %%01SYSTATE/6/HEALTH(l): cpu=23 totalmemory=390127084 curmemory=176017096 @version:1 _id:gjYK7XEBLIinXt5dutFF _type:_doc _index:dhcp-2020.05.07-001 _score: -
host:10.10.2.21 tags:_grokparsefailure type:dhcp @timestamp:May 6, 2020 @ 22:50:33.727 message:<186>May 6 22:50:32 10.6.5.2 Kiwi_Syslog_Server May 7 2020 02:50:33 Data_Center-S9306-AS-1 OSPF/2/IFAUTFAIL:OID 1.3.6.1.2.1.14.16.2.6: A packet is received on a non-virtual interface from a router whose authentication key or authentication type conflicts with the local authentication key or authentication type. (IfIpAddress=10.4.0.254, AddressLessIf=0, ProcessId=3, RouterId=10.6.64.11, PacketSrc=10.4.0.44, ConfigErrorType=5, PacketType=1, InstanceName=) @version:1 _id:WzYK7XEBLIinXt5dtMzM _type:_doc _index:dhcp-2020.05.07-001 _score: -
host:10.10.2.21 tags:_grokparsefailure type:dhcp @timestamp:May 6, 2020 @ 22:50:32.478 message:<188>May 6 22:50:31 10.7.32.7 Kiwi_Syslog_Server date=2020-05-06 time=22:50:32 devname=MOTP_GT-FS248D-CE-7 device_id=S248DN3X17000508 log_id=0103034134 type=event subtype=system pri=warning vd=root user="alertd" msg="[First Event] TEMPERATURE_SENSOR_1 (49.0C) cleared warning threshold of (50.0C)." @version:1 _id:4jYK7XEBLIinXt5dsMg6 _type:_doc _index:dhcp-2020.05.07-001 _score: -
message:<134>May 6 22:52:29 10.7.24.23 Kiwi_Syslog_Server May 6 22:52:30 National-Trust-CE-1 PRESSDHCPSERVER assigned 172.18.20.24 to E8:93:09:CC:2A:C6 type:dhcp host:10.10.2.21 @version:1 @timestamp:May 6, 2020 @ 22:52:30.528 _id:jjgM7XEBLIinXt5dfFvH _type:_doc _index:dhcp-2020.05.07-001 _score: -

我只想存储具有DHCP信息的日志的最后一行。我观察到带有DHCP信息的日志中有单词"assigned",所以我正在寻找一种方法,让logstash查找单词"assign",并只存储那行日志。

下面是我对logstash 的配置

input {
udp  {
type => "dhcp"
port => "518"
}
}

filter {
if [type] == "dhcp" {
grok {
match => {
"message" => "assigned"
}
}
}
}
output {
if [type] == "dhcp" {
elasticsearch { hosts => ["localhost:9200"] 
index => "dhcp-%{+yyyy.MM.dd}-001"
}
}
}

你能帮我吗,非常感谢

你好,Narendra Rajcoomar。

我不能百分之百肯定我明白你的问题,所以如果我错了,请纠正我,我会编辑我的答案。

您正在使用提供的logstash.conf文件,并在弹性搜索中获得上面的行,其中还包括您不感兴趣的事件,并且您希望ES只保存看起来像最后一行的事件。

如果这是正确的,您可能会注意到除了最后一行之外的所有其他事件都有一个名为_grokparsefailuretag。这是因为它们实际上不包括"assigned"关键字,因此它们不是"match"。

只需相应地更改logstash.conf输出部分,它就会删除不必要的日志:

input {
udp  {
type => "dhcp"
port => "518"
}
}
filter {
if [type] == "dhcp" {
grok {
match => {"message" => "assigned"}
}
}
}
output {
if [type] == "dhcp" and "_grokparsefailure" not in [tags] {
elasticsearch { 
hosts => ["localhost:9200"] 
index => "dhcp-%{+yyyy.MM.dd}-001"
}
}
}

最新更新