LogStash-存储RabbitMQ日志 - 多行



我已经使用了麋鹿大约六个月了,到目前为止非常棒。我正在使用LogStash版本6.2.3。RabbitMQ构成了我的分布式系统的核心(RabbitMQ本身是分布式(,因此我很重要的是我跟踪RabbitMQ的日志非常重要。该论坛上的大多数对话似乎都使用RabbitMQ作为输入/输出阶段,但我只想监视日志。我发现唯一的问题是RabbitMQ具有多行记录,例如:

=WARNING REPORT==== 19-Nov-2017::06:53:14 ===
closing AMQP connection <0.27161.0> (...:32799 -> ...:5672, vhost: '/', user: 'worker'):
client unexpectedly closed TCP connection
=WARNING REPORT==== 19-Nov-2017::06:53:18 ===
closing AMQP connection <0.22410.0> (...:36656 -> ...:5672, vhost: '/', user: 'worker'):
client unexpectedly closed TCP connection
=WARNING REPORT==== 19-Nov-2017::06:53:19 ===
closing AMQP connection <0.26045.0> (...:55427 -> ...:5672, vhost: '/', user: 'worker'):
client unexpectedly closed TCP connection
=WARNING REPORT==== 19-Nov-2017::06:53:20 ===
closing AMQP connection <0.5484.0> (...:47740 -> ...:5672, vhost: '/', user: 'worker'):
client unexpectedly closed TCP connection

我在这里找到了一个出色的代码示例,我只是将其剥离到过滤阶段,因此看起来像这样:

filter {
    if [type] == "rabbitmq" {
        codec => multiline {
            pattern => "^="
            negate => true
            what => "previous"
        }
        grok {
            type => "rabbit"
            patterns_dir => "patterns"
            pattern => "^=%{WORD:report_type} REPORT=+ %{RABBIT_TIME:time_text} ===.*$"
        }
        date {
            type => "rabbit"
            time_text => "dd-MMM-yyyy::HH:mm:ss"
        }
        mutate {
            type => "rabbit"
            add_field => [ 
                "message", 
                "%{@message}" 
            ]
        }
        mutate {
            gsub => [
                "message", "^=[A-Za-z0-9: =-]+=n", "",
                # interpret message header text as "severity"
                "report_type", "INFO", "1",
                "report_type", "WARNING", "3",
                "report_type", "ERROR", "4",
                "report_type", "CRASH", "5",
                "report_type", "SUPERVISOR", "5"
            ]
        }
    }
}

但是,当我将其保存到conf文件并重新启动logstash时,我会收到以下错误:

[2018-04-04T07:01:57,308][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/usr/share/logstash/modules/fb_apache/configuration"}
[2018-04-04T07:01:57,316][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/usr/share/logstash/modules/netflow/configuration"}
[2018-04-04T07:01:57,841][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-04-04T07:01:57,973][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-04-04T07:01:58,037][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, { at line 3, column 15 (byte 54) after filter {n    if [type] == "rabbitmq" {n        codec ", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:42:in `compile_imperative'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:50:in `compile_graph'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:12:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:11:in `compile_sources'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:51:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:169:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:40:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:315:in `block in converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:312:in `block in converge_state'", "org/jruby/RubyArray.java:1734:in `each'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:299:in `converge_state'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:90:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/runner.rb:348:in `block in execute'", "/usr/share/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]}

有什么想法可能是什么?

谢谢,

如果您将日志从Rabbitmq Server发送到使用FileBeat的LogStash,则应在此处配置Multiline。

答案确实是多行的。目的是合并以外的日期,与以前的行以日期开始的行为合并。这就是:

multiline.pattern: '^d{4}-d{2}-d{2}'
multiline.negate: true
multiline.match: after

注意:我以前试图合并任何线条与太空字符 ^s+开头,但这无效,因为并非所有警告或错误消息都始于空间。

完整的filebeat输入(7.5.2格式(

filebeat:
  inputs:
  - exclude_lines:
    - 'Failed to publish events caused by: EOF'
    fields:
      type: rabbitmq
    fields_under_root: true
    paths:
    - /var/log/rabbitmq/*.log
    tail_files: false
    timeout: 60s
    type: log
    multiline.pattern: '^d{4}-d{2}-d{2}'
    multiline.negate: true
    multiline.match: after

logstash模式

# RabbitMQ
RABBITMQDATE %{MONTHDAY}-%{MONTH}-%{YEAR}::%{HOUR}:%{MINUTE}:%{SECOND}
RABBITMQLINE (?m)=%{DATA:severity} %{DATA}==== %{RABBITMQDATE:timestamp} ===n%{GREEDYDATA:message}

我确定他们有充分的理由在RMQ 3.7.x中以这种奇怪的方式登录,但是在不知道它们的情况下,这确实使我们的生活变得艰难。

您不能将编解码器用作过滤器插件。编解码器只能在输入或输出插件中使用(请参见编解码器配置选项(。

您必须将多元编解码器放在生产RabbitMQ日志的输入插件中。

最新更新