Logstash Kafka-错误:负长度-给定62



我面临着与本文类似的问题:https://discuss.elastic.co/t/argumenterror-when-using-kafka-input-avro-codec/116975

Logstash Conf:

input {
kafka{
group_id => "group_1"
topics => ["topic_1"]
bootstrap_servers => "192.168.0.1:9092"
codec => avro {
schema_uri => "/files/GA6/logstash-6.0.0/CONFIG_HOME/myschema.avsc"
}
}
}
output{
stdout{
}
}

错误日志:

[2018-01-25T11:54:37,060][FATAL][logstash.runner          ] An unexpected error occurred! 
{:error=>#<ArgumentError: negative length -15 given>, :backtrace=>[
"org/jruby/ext/stringio/StringIO.java:788:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:106:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:93:in `read_bytes'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:99:in `read_string'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:299:in `read_data'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
"org/jruby/RubyArray.java:1734:in `each'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:382:in `read_record'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:310:in `read_data'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/avro-1.8.2/lib/avro/io.rb:275:in `read'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-codec-avro-3.2.3-java/lib/logstash/codecs/avro.rb:77:in `decode'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:254:in `block in thread_runner'", 
"/files/GA6/logstash-6.0.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in thread_runner'"
]}

架构示例:

{
"type": "record",
"name": "Sample",
"doc": "Sample Schema",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "address",
"type": "string"
}, {
"name": "salary",
"type": "long"
}
]
}

基于一些讨论,我还添加了以下内容:

key_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

但问题仍然存在。。。

如果你需要任何进一步的信息,请告诉我。

这个问题实际上是向Logstash团队提出的,并最终集成到Logstash Kafka输入中,而不是编解码器:

用法:

input {
kafka {
id => "kafka_avro_events"
group_id => "logstash_kafka_avro"
topics_pattern => "some_avro_topic"
bootstrap_servers => "kafka:9092"
decorate_events => true
value_deserializer_class => "io.confluent.kafka.serializers.KafkaAvroDeserializer"
schema_registry_url => "http://schemaregistry:8081"
metadata_max_age_ms => "5000"
codec => "json"
}
}

请确保您使用的是v>7.10.x

Github问题:https://github.com/logstash-plugins/logstash-input-kafka/pull/239

最新更新