消息不以魔术字节开头



我正在尝试使用Go中的/linkedin/goavro包将avro编码的数据生成到kafka主题中。目标是能够使用不同的客户端来使用主题。

首先,我注册模式如下:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{"name":"test_topic2","type":"record", "fields":[{"name":"user","type":"string"},{"name":"password","size":10,"type":"string"}]}"}' http://localhost:8081/subjects/test_topic2-value/versions

然后我创建avro数据,用Go生成和消费它。

package main
import (
"github.com/Shopify/sarama"
"github.com/linkedin/goavro"
"fmt"
)
const (
brokers = "localhost:9092"
topic     = "test_topic2"
)
const loginEventAvroSchema = `{"name":"test_topic2","type":"record", "fields":[{"name":"user","type":"string"},{"name":"password","size":10,"type":"string"}]}`
func main() {
// Create Message
codec, err := goavro.NewCodec(loginEventAvroSchema)
if err != nil {
panic(err)
}
m := map[string]interface{}{
"user": "pikachu", "password": 231231,
}
single, err := codec.SingleFromNative(nil, m)
if err != nil {
panic(err)
}

// Producer
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true
config.Version = sarama.V2_4_0_0
//get broker
cluster, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := cluster.Close(); err != nil {
panic(err)
}
}()
msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(single),
}
cluster.SendMessage(msg)
// Consumer 
clusterConsumer, err := sarama.NewConsumer(brokers, config)
if err != nil {
panic(err)
}
defer func() {
if err := clusterConsumer.Close(); err != nil {
panic(err)
}
}()
msgK, _ := clusterConsumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
for {
q := <-msgK.Messages()
native, _, err := codec.NativeFromSingle([]byte(q.Value))
if err != nil {
fmt.Println(err)
}
fmt.Println(native)
}

这段代码运行良好,我可以成功地在kafka主题中生成和使用消息。

现在我尝试使用python avro consumer:的主题

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError

c = AvroConsumer({
'bootstrap.servers': 'localhost',
'group.id': 'groupid',
'schema.registry.url': 'http://localhost:8081',
'auto.offset.reset': 'earliest'})

c.subscribe(['test_topic2'])
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
print(msg.value(), msg.key())
c.close()

但我得到以下错误:

confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at test_topic2 [0] offset 1: message does not start with magic byte

我想我在围棋制作人方面错过了一些东西,如果有人能分享他/她的经验来解决这个问题,我将不胜感激。

goavro不使用模式注册表。

另外,您使用的是StringEncoder,我假设它只输出字符串切片,而不是Avro字节

StringEncoder为Go字符串实现编码器接口,以便它们可以用作ProducerMessage中的键或值。

FWIW,我建议用kafka-avro控制台消费者测试消费者,如果你有

最新更新