我正在使用服务器上的fluentd导出日志。
我的配置使用类似的东西来捕获多个日志文件:
<source>
type tail
path /my/path/to/file/*/*.log
</source>
正确跟踪不同的文件,但是,我还需要一个功能:
也应将路径的两个通配符部分添加到记录中(我们称它们为directory
和filename
)。
如果in_tail
插件会将文件名添加到记录中,我可以编写一个格式化器以拆分和编辑。
我缺少的任何东西或将in_tail
重写为我的心希望是最好的方法吗?
所以,是的。扩展in_tail
是要走的方式。
我已经编写了一个新的插件,该插件从NewTailInput
继承并使用略有不同的parse_singleline
和parse_multilines
来添加记录的路径。
比预期好得多。
更新6/3/2020:我已经挖出了代码,这是我可以召集的最少的红宝石来解决问题。自定义convert_line_to_event_with_path_names
,以便您需要在记录中添加自定义数据。
module Fluent
class DirParsingTailInput < NewTailInput
Plugin.register_input('dir_parsing_tail', self)
def initialize
super
end
def receive_lines(lines, tail_watcher)
es = @receive_handler.call(lines, tail_watcher)
unless es.empty?
tag = if @tag_prefix || @tag_suffix
@tag_prefix + tail_watcher.tag + @tag_suffix
else
@tag
end
begin
router.emit_stream(tag, es)
rescue
# ignore errors. Engine shows logs and backtraces.
end
end
end
def convert_line_to_event_with_path_names(line, es, path)
begin
directory = File.basename(File.dirname(path))
filename = File.basename(path, ".*")
line.chomp! # remove n
@parser.parse(line) { |time, record|
if time && record
if directory != "logs"
record["parent"] = directory
record["child"] = filename
else
record["parent"] = filename
end
es.add(time, record)
else
log.warn "pattern not match: #{line.inspect}"
end
}
rescue => e
log.warn line.dump, :error => e.to_s
log.debug_backtrace(e.backtrace)
end
end
def parse_singleline(lines, tail_watcher)
es = MultiEventStream.new
lines.each { |line|
convert_line_to_event_with_path_names(line, es, tail_watcher.path)
}
es
end
def parse_multilines(lines, tail_watcher)
lb = tail_watcher.line_buffer
es = MultiEventStream.new
if @parser.has_firstline?
lines.each { |line|
if @parser.firstline?(line)
if lb
convert_line_to_event_with_path_names(lb, es, tail_watcher.path)
end
lb = line
else
if lb.nil?
log.warn "got incomplete line before first line from #{tail_watcher.path}: #{line.inspect}"
else
lb << line
end
end
}
else
lb ||= ''
lines.each do |line|
lb << line
@parser.parse(lb) { |time, record|
if time && record
convert_line_to_event_with_path_names(lb, es, tail_watcher.path)
lb = ''
end
}
end
end
tail_watcher.line_buffer = lb
es
end
end
end