带有源文件名零件的Fluentd记录



我正在使用服务器上的fluentd导出日志。

我的配置使用类似的东西来捕获多个日志文件:

<source>
  type tail
  path /my/path/to/file/*/*.log
</source>

正确跟踪不同的文件,但是,我还需要一个功能:

也应将路径的两个通配符部分添加到记录中(我们称它们为directoryfilename)。

如果in_tail插件会将文件名添加到记录中,我可以编写一个格式化器以拆分和编辑。

我缺少的任何东西或将in_tail重写为我的心希望是最好的方法吗?

所以,是的。扩展in_tail是要走的方式。

我已经编写了一个新的插件,该插件从NewTailInput继承并使用略有不同的parse_singlelineparse_multilines来添加记录的路径。

比预期好得多。


更新6/3/2020:我已经挖出了代码,这是我可以召集的最少的红宝石来解决问题。自定义convert_line_to_event_with_path_names,以便您需要在记录中添加自定义数据。

module Fluent
  class DirParsingTailInput < NewTailInput
    Plugin.register_input('dir_parsing_tail', self)
    def initialize
      super
    end

    def receive_lines(lines, tail_watcher)
      es = @receive_handler.call(lines, tail_watcher)
      unless es.empty?
        tag = if @tag_prefix || @tag_suffix
                @tag_prefix + tail_watcher.tag + @tag_suffix
              else
                @tag
              end
        begin
          router.emit_stream(tag, es)
        rescue
          # ignore errors. Engine shows logs and backtraces.
        end
      end
    end
    def convert_line_to_event_with_path_names(line, es, path)
      begin
        directory = File.basename(File.dirname(path))
        filename = File.basename(path, ".*")
        line.chomp!  # remove n
        @parser.parse(line) { |time, record|
          if time && record
            if directory != "logs"
              record["parent"] = directory
              record["child"] = filename
            else
              record["parent"] = filename
            end 
            es.add(time, record)
          else
            log.warn "pattern not match: #{line.inspect}"
          end
        }
      rescue => e
        log.warn line.dump, :error => e.to_s
        log.debug_backtrace(e.backtrace)
      end
    end
    def parse_singleline(lines, tail_watcher)
      es = MultiEventStream.new
      lines.each { |line|
        convert_line_to_event_with_path_names(line, es, tail_watcher.path)
      }
      es
    end
    def parse_multilines(lines, tail_watcher)
      lb = tail_watcher.line_buffer
      es = MultiEventStream.new
      if @parser.has_firstline?
        lines.each { |line|
          if @parser.firstline?(line)
            if lb
              convert_line_to_event_with_path_names(lb, es, tail_watcher.path)
            end
            lb = line
          else
            if lb.nil?
              log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
            else
              lb << line
            end
          end
        }
      else
        lb ||= ''
        lines.each do |line|
          lb << line
          @parser.parse(lb) { |time, record|
            if time && record
              convert_line_to_event_with_path_names(lb, es, tail_watcher.path)
              lb = ''
            end
          }
        end
      end
      tail_watcher.line_buffer = lb
      es
    end
  end
end

最新更新