我需要将我的应用程序日志发送到FluentD中,FluentD是EFK服务的一部分。所以我尝试配置另一个FluentD来实现这一点。
my-fluent.conf:
<source>
@type kafka_group
consumer_group cgrp
brokers "#{ENV['KAFKA_BROKERS']}"
scram_mechanism sha512
username "#{ENV['KAFKA_USERNAME']}"
password "#{ENV['KAFKA_PASSWORD']}"
ssl_ca_certs_from_system true
topics "#{ENV['KAFKA_TOPICS']}"
format json
</source>
<filter TOPIC>
@type parser
key_name log
reserve_data false
<parse>
@type json
</parse>
</filter>
<match TOPIC>
@type copy
<store>
@type stdout
</store>
<store>
@type forward
<server>
host "#{ENV['FLUENTD_HOST']}"
port "#{ENV['FLUENTD_PORT']}"
shared_key "#{ENV['FLUENTD_SHARED_KEY']}"
</server>
</store>
</match>
我能够正确地看到stdout
的输出
2021-07-06 07:36:54.376459650+0000主题:{"foo":"bar",…}
但我无法查看来自kibana的日志。经过跟踪,我发现第二个fluentd在接收数据时抛出错误:
{"时间":"2021-07-05 11:21:41+000","级别":"错误","消息":"读取数据时出现意外错误主机="X.X.X.X"端口=58548 error_class=MessagePack::MalformedFormatError错误="无效字节","worker_id":0}{"时间":"2021-07-05 11:21:41+000","级别":"错误","worker_id":0,"消息":"/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:262:在
feed_each'n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:262:in
块中(2个级别(在read_messages'/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:271:在block in read_messages'n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/server.rb:613:in
中on_read_without_connection'/usr/lib-rub/gems/2.7.0/17.1/lib/cool.io/io.rb:123:在on_readable'n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in
中on_Readive'/usr/lib/Rub/gem/2.7.0/gems/ccool.io-1.7.1/lib/cool.io/loop.rb:88:在run_once'n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in
中在thread_create中的block in start'n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/thread.rb:78:in
块中运行'/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/event_loop.rb:93:
问题是第一个fluentd中缺少安全标记。
<match TOPIC>
@type copy
<store>
@type stdout
</store>
<store>
@type forward
<server>
host "#{ENV['FLUENTD_HOST']}"
port "#{ENV['FLUENTD_PORT']}"
shared_key "#{ENV['FLUENTD_SHARED_KEY']}"
</server>
<security>
self_hostname HOSTNAME
shared_key "#{ENV['FLUENTD_SHARED_KEY']}"
</security>
</store>
</match>