Logstash:用静态csv文件中的数据丰富日志文件中的事件



摘要:在logstash中使用一个过滤器,该过滤器将从事件字段中读取值,在外部文件(例如csv)中查找该值,并从匹配的外部文件中检索值。使用外部文件中的值作为事件中的额外字段添加。

更多信息:我有一个包含事件的日志文件。事件看起来像:

{"@timestamp":"2014-06-18T11:52:45.370636+02:00","location":{"MainId":3,"SubId":"5"},"EndRequest":{"Duration":{"Main":0,"Page":6720}}}

我有一个静态csv文件,如:

1,left
2,right
3,top

当在logstash中处理事件时,我希望能够使用一个过滤器来检查MainId的值(例如event=3),并在csv文件中找到这个值。如果找到,则事件必须获得一个标记:"top"。

这是一种类似于过滤"GeoIP"的方式。事件有一个字段值,匹配"数据库"中的值,并返回可以添加到事件中的值。

我找不到一个可以进行上述处理的当前过滤器。我需要自己制作一个自定义过滤器吗?如果是这样的话,有人能给我们一个提示吗?

有翻译过滤器。

您有一个YAML文件,而不是CSV,对于单个键值对,它应该是一个简单的sed YAML转换

撰写时的最新文档:http://logstash.net/docs/1.4.2/filters/translate

我从未见过为它编写的插件,所以我继续写了一个非常基本的插件:

# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
require "csv"
# The cvslookup filter allows you to add fields to an event
# base on a csv file
class LogStash::Filters::CSVLookup < LogStash::Filters::Base
  config_name "csvlookup"
  milestone 1
  # Example:
  #
  #     filter {
  #       csvlookup {
  #     file => 'key_value.csv'
  #     key_col => 1
  #     value_col => 2
  #     default => 'some_value'
  #         map_field => { "from_field" => "to_field" }
  #       }
  #     }
  # 
  # the default is used if the key_col's value is not present in the CSV file
  config :file, :validate => :string, :required => true
  config :key_col, :validate => :number, :default => 1, :required => false
  config :value_col, :validate => :number, :default => 2, :required => false
  config :default, :validate => :string, :required => false
  config :map_field, :validate => :hash, :required => true
  public
  def register
    @lookup = Hash.new
    CSV.foreach(@file) do |row|
      @lookup[row[@key_col - 1]] = row[@value_col - 1]
    end
    #puts @lookup.inspect
  end # def register
  public
  def filter(event)
    return unless filter?(event)
    @map_field.each do |src_field,dest_field|
      looked_up_val = @lookup[event[src_field].to_s]
      if looked_up_val.nil?
          if !@default.nil?
            event[dest_field] = @default
          end
      else
        if event[dest_field].nil?
          event[dest_field] = looked_up_val
        elsif !event[dest_field].is_a?(Array)
          event[dest_field] = [ event[dest_field], looked_up_val ]
        else
          event[dest_field].push(looked_up_val)
        end
      end
    end 
  end # def filter
end # class LogStash::Filters::CSVLookup

还有更多的工作可以做——例如,如果src_field是一个数组,它可以对它进行迭代,但它应该按原样工作。

最新更新