假设我通过来自 Confluent 的 kafka 流创建了主题,其中包含在 avro 中序列化的消息,io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer
然后我在 Hive 中创建一个外部 kafka 表
CREATE EXTERNAL TABLE k_table
(`id` string , `sequence` int)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES
(
"kafka.topic" = "sample-topic",
"kafka.bootstrap.servers"="kafka1:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.avro.AvroSerDe",
"avro.schema.url"="Sample.avsc"
(;
当我运行查询时:
select * from k_table WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '2' DAYS)
我遇到意外的 IO 错误:
INFO : Executing command(queryId=root_20190205160129_4579b5ff-9a5c-496d-8d03-9a7ccc0f6d90): select * from k_tickets_prod2 WHERE `__timestamp` > 1000 * to_unix_timestamp(CURRENT_TIMESTAMP - interval '1' minute)
INFO : Completed executing command(queryId=root_20190205160129_4579b5ff-9a5c-496d-8d03-9a7ccc0f6d90); Time taken: 0.002 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
Error: java.io.IOException: java.lang.ArrayIndexOutOfBoundsException: 55 (state=,code=0)
好吧,与 Confluent kafka 消费者一起工作得很好,我也试图在 TBLPROPERTIES
中设置 confluent kafka 反序列化器,这似乎必须生效。
环境:
Hive 4.0 + Beeline 3.1.1 + Kafka 1.1 (Clients & Broker) + Confluent 4.1
问题是 Confluent 生产者使用自定义格式序列化 avro 消息作为 <magic_byte 0x00><4 bytes of schema ID><regular avro bytes for object that conforms to schema>
。所以 Hive kafka 处理程序在反序列化时遇到问题,因为它使用基本的字节数组 kafka 反序列化器,消息开头的这 5 个字节是意外的。
我在 hive 中创建了一个错误以支持 Confluent 格式和架构注册表,并且我还制作了一个带有快速修复的 PR,在 TBLPROPERTIES
中设置属性后"avro.serde.magic.bytes"="true"
从消息中删除 5 个字节。
在此补丁之后,它就像魅力一样工作。