无法读取Kafka-Avro模式消息



这个问题有解决方案吗???我无法阅读KAFKA-AVRO架构消息。我正在尝试从logstash向KAFKA向HDFS发送消息。

以下是技术堆栈:

  1. Logstash 2.3-当前生产版本
  2. 汇流3.0
  3. 插件:a.Logstash kafka输出插件b.Logstash编解码器avro
  4. 动物园管理员:3.4.6
  5. 卡夫卡:0.10.0.0

Logstash配置文件如下所示:

input {
stdin{}
}
filter {
mutate {
remove_field => ["@timestamp","@version"]
}
}
output {
kafka {
topic_id => 'logstash_logs14'
codec => avro  { 
schema_uri => "/opt/logstash/bin/schema.avsc"
}
}
}

schema.avsc文件如下所示:

{
"type":"record",
"name":"myrecord",
"fields":[
{"name":"message","type":"string"},
{"name":"host","type":"string"}
]
}

运行了以下命令:

  1. 在自己的终端中启动Zookeeper

    /bin/zookeeper服务器启动/etc/kafka/zookeeper.properties

2在自己的终端中启动Kafka

./bin/kafka-server-start ./etc/kafka/server.properties

3在自己的终端中启动模式注册表

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4从logstash目录运行以下命令

bin/logstash -f ./bin/logstash.conf

5在运行上述命令后,键入您希望发送给kafka的日志消息例如:"Hello World">

6消费Kafka 的主题

./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

请让我知道如何解决这个问题

谢谢,Upendra

您是如何编写/发布Kafka的?您看到SerializationException,因为数据不是使用架构注册表(或KafkaAvroSerializer)写入的,但在使用架构注册表时,kafkaavro控制台使用者内部使用架构注册表或kafka AvroDeserializer,该注册表要求数据为特定格式(特别是<magic byte><schemaId><data>)。如果你使用kafka-avro控制台生产者来写avro数据,那么你不应该得到这个异常,或者你可以在你的生产者属性中为key&值序列化程序,还设置模式注册表url。

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");

也许回答得太晚了,但现在面临着同样的问题。

Logstrash在这里使用默认的序列化程序,"org.apache.kafka.common.serialization.StringSerializer">

因此,如果您想从事件总线读取Avro消息,则必须在logstash输出上使用KafkaAvroSerializer对其进行序列化"io.confluent.kafka.serializers.KafkaAvroSerializer";

然后从使用者部分使用匹配的反序列化程序。问题是logstash根本不识别IO.CONFLUENT,所以你必须做一些棘手的事情来添加它,比如deps和jars

最新更新