我有1个-单个CSV文件和2个-实时KAFKA流。KAFKA流引入了实时流日志,CSV文件包含元数据记录,在将它们发送到Elastic Search之前,我需要将它们与流日志连接起来。
Kafka流日志和CSV记录示例:
KAFKA log: MachineID: 2424, MachineType: 1, MessageType: 9
CSV record: MachineID: 2424, MachineOwner: JohnDuo
在发送到ES:之前,我需要在logstash中构建记录
MachineID: 2424
MachineOwner: JohnDuo
MachineType: 1
MessageType: 9
我想要一个Ruby或Logstash插件或其他任何东西的解决方案来读取这个CSV文件一次并将它们引入并加入Logstash conf文件中。我需要保留内容内存中的CSV文件,否则在每个实时Kafka日志上查找CSV会扼杀我的Logstash性能。
尝试translate
过滤器。
你需要这样的东西。
filter {
translate {
dictionary_path => "/path/to/your/csv/file.csv"
field => "[MachineId]"
destination => "[MachineOwner]"
fallback => "not found"
}
}
然后,在file.csv
中,您将获得以下内容。
2424,JohnDuo
2425,AnotherUser
对于每个具有字段MachineId
的事件,此筛选器将在字典中查找此id,如果找到匹配项,它将使用匹配项的值创建名为MachineOwner
的字段,如果找不到匹配项,则将使用值not found
创建字段MachineOwner
,如果不想在不匹配的情况下创建字段,则可以删除fallback
选项。
当logstash启动时,字典会加载到内存中,并且每300秒重新加载一次,您也可以更改这种行为。