Logstash 5.2 不适用于 Kafka 9 和 AVRO



我已经安装了带有logstash-input-kafka 4.1.1logstash-codec-avro 3.0.0的Logstash 5.2.0,并尝试从Cloudera Kafka 9读取数据,但我收到golowing错误:

[2017-02-03T03:05:35,049][INFO ][logstash.pipeline        ] Pipeline main started
[2017-02-03T03:05:35,064][DEBUG][logstash.agent           ] Starting puma
[2017-02-03T03:05:35,065][DEBUG][logstash.agent           ] Trying to start WebServer {:port=>9600}
[2017-02-03T03:05:35,068][DEBUG][logstash.api.service     ] [api-service] start
[2017-02-03T03:05:35,090][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-02-03T03:05:35,303][DEBUG][logstash.inputs.kafka    ] closing {:plugin=>"LogStash::Inputs::Kafka"}
[2017-02-03T03:05:35,304][DEBUG][logstash.pipeline        ] Input plugins stopped! Will shutdown filter/output workers.
[2017-02-03T03:05:35,338][DEBUG][logstash.pipeline        ] Pushing flush onto pipeline
[2017-02-03T03:05:35,339][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x6e057136 sleep>"}
[2017-02-03T03:05:35,340][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0xcb2b987 sleep>"}
[2017-02-03T03:05:35,340][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x6aa67ce5 sleep>"}
[2017-02-03T03:05:35,340][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x2f544881 run>"}
[2017-02-03T03:05:35,340][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x20d253d0 sleep>"}
[2017-02-03T03:05:35,341][DEBUG][logstash.pipeline        ] Pushing shutdown {:thread=>"#<Thread:0x65d168b sleep>"}
[2017-02-03T03:05:35,341][DEBUG][logstash.pipeline        ] Shutdown waiting for worker thread #<Thread:0x6e057136>
[2017-02-03T03:05:35,439][FATAL][logstash.runner          ] An unexpected error occurred! {:error=>#<ArgumentError: negative length -2600952 given>, :backtrace=>["org/jruby/ext/stringio/StringIO.java:829:in `read'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:106:in `read'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:93:in `read_bytes'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:304:in `read_data'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:290:in `read_data'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:376:in `read_union'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:309:in `read_data'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:384:in `read_record'", "org/jruby/RubyArray.java:1613:in `each'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:382:in `read_record'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:310:in `read_data'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/avro-1.8.1/lib/avro/io.rb:275:in `read'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/logstash-codec-avro-3.0.0-java/lib/logstash/codecs/avro.rb:73:in `decode'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-4.1.1/lib/logstash/inputs/kafka.rb:163:in `thread_runner'", "file:/apps/logstash-5.2.0/vendor/jruby/lib/jruby.jar!/jruby/java/java_ext/java.lang.rb:12:in `each'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-4.1.1/lib/logstash/inputs/kafka.rb:162:in `thread_runner'"]}
[2017-02-03T03:05:35,444][DEBUG][logstash.agent           ] Error in reactor loop escaped: Bad file descriptor - Bad file descriptor (Errno::EBADF)
[2017-02-03T03:05:35,445][DEBUG][logstash.agent           ] ["org/jruby/RubyIO.java:3705:in `select'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/reactor.rb:29:in `run_internal'", "/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/reactor.rb:138:in `run_in_thread'"]
[2017-02-03T03:05:35,445][DEBUG][logstash.agent           ] 2017-02-03 03:05:35 -0500: Listen loop error: #<Errno::EBADF: Bad file descriptor - Bad file descriptor>
[2017-02-03T03:05:35,446][DEBUG][logstash.agent           ] org/jruby/RubyIO.java:3705:in `select'
/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/server.rb:322:in `handle_servers'
/apps/logstash-5.2.0/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/server.rb:296:in `run'
[2017-02-03T03:05:35,446][DEBUG][logstash.agent           ] Error in reactor loop escaped: Bad file descriptor - Bad file descriptor (Errno::EBADF)
同样适用于 Logstash 2.4 和 logstash-codec-avro

2.0.4。 logstash-codec-avro(3.0.0(

这是我的日志配置文件:

input {
  kafka {
    bootstrap_servers => "server1:9092,server2:9092,server3:9092,server4:9092,server5:9092"
    topics => ["mytopicname"]
    group_id => "mygroup"
    codec => avro {
      schema_uri => "/apps/schema/rocana3.schema"
    }
  }
}
filter {
}
output {
        file {
          path => "/apps/elk/test/dump.txt"
        }
}

编辑

我正在使用带有logstash-codec-avro(3.0.0(和logstash-input-kafka(2.0.9(Logstash 2.4,它工作正常。我能够从卡夫卡读取并解码 avro 数据。

当我使用 logstash-input-kafka 4.1.1 和 logstash-codec-avro 3.0.0 安装 Logstash 5.2.0 时,我能够从 Kafka 读取数据,但是当我添加codec => avro { schema_uri => "/apps/schema/rocana3.schema" }时,我遇到了上面提到的错误。

logstash 5 中的默认序列化程序已从字节数组反序列化程序更改为字符串反序列化程序

https://www.elastic.co/guide/en/logstash/2.4/plugins-inputs-kafka.html#plugins-inputs-kafka-consumer_threads

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-consumer_threads

请在 kafka 输入中添加以下配置:
key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer">

最新更新