我正试图使用avro模式将数据从logstash发送到kafka。
我的logstash输出看起来像:
kafka{
codec => avro {
schema_uri => "/tmp/avro/hadoop.avsc"
}
topic_id => "hadoop_log_processed"
}
我的模式文件看起来像:
{"type": "record",
"name": "hadoop_schema",
"fields": [
{"name": "loglevel", "type": "string"},
{"name": "error_msg", "type": "string"},
{"name": "syslog", "type": ["string", "null"]},
{"name": "javaclass", "type": ["string", "null"]}
]
}
kafka-console-consumer
:输出
CElORk+gAURvd24gdG8gdGhlIGxhc3QgbWVyZ2UtcGCzcywgd2l0aCA3IHNlZ21lbnRzIGxlZnQgb2YgdG90YWwgc256ZTogMjI4NDI0NDM5IGJ5dGVzAAxbbWFpbl0APm9yZy5hcGFjaGUuaGFkb29wLm1hcHJlZC5NZXJnZXI=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9kVGFzayAnYXR0ZW1wdF8xNDQ1JDg3NDkxNDQ1XzAwMDFfbV8wMDAwMDRfMCcgZG9uZS4ADFttYWluXQA6t3JnLmFwYWNoZS5oYWRvb6AubWFwcmVkLlRhc2s=
CElORk9OVGFza0hlYAJ0YmVhdEhhbmRsZXIgdGhyZWFkIGludGVycnVwdGVkAERbVGFza0hlYXJdYmVhdEhhbmRsZXIgUGluZ0NoZWNrZXJdAG5vcmcuYVBhY2hlLmhhZG9vcC5tYXByZWR1Y2UudjIuYXBwLlRhc2tIZWFydGJ3YXRIYW5kbGVy
我的连接器中也出现了以下错误:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:465)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic hadoop_log_processed to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:86)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:488)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
我知道我在logstash网站上对数据进行了编码。在kafka中输入时,我必须解码消息吗?或者我可以解码/反序列化连接器配置中的数据吗?
有没有办法禁用logstash网站上的编码?我读过一个base64_encoding选项,但它似乎没有这个选项。
这里的问题是Logstash的Avro编解码器没有将数据串行化为Confluent Schema Registry Avro deserializer所期望的Avro形式。
Logstash采用avsc并在此基础上将数据编码为二进制形式,而Confluent Schema Registry[de]serializer则存储&直接从注册表中检索模式(而不是avsc
文件(。
因此,当你得到Failed to deserialize data … SerializationException: Unknown magic byte!
时,Avro反序列化器说它不会将数据识别为使用Schema Registry序列化器序列化的Avro。
我在谷歌上快速搜索了一下,发现这个编解码器看起来支持Schema Registry(因此支持Kafka Connect,以及任何其他以这种方式取消Avro数据的消费者(。
或者,将数据作为JSON写入Kafka,并使用Kafka Connect中的org.apache.kafka.connect.json.JsonConverter
从主题中读取。
参考:
- http://rmoff.dev/ksldn19-kafka-connect
- https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/