使用数据生成的ksql-datagen实用程序反序列化异常



从以下模式的ksql-datagen实用程序生成样本流-

{
"type": "record",
"name": "users",
**"namespace": "com.example",**
"fields": [
{
"name": "registertime",
"type": {
"type":"long",
"arg.properties":{
"range":{"min":1487715775521,"max":1519273364600}
}
}
},
{
"name": "userid",
"type": {
"type":"string",
"arg.properties":{"regex":"User_[1-9][0-2]"}
}
},
{
"name": "regionid",
"type": {
"type":"string",
"arg.properties":{"regex":"Region_[1-9]"}
}
},
{
"name": "gender",
"type": {
"type":"string",
"arg.properties":{
"options":["MALE","FEMALE","OTHER"]
}
}
}
]}

在检查版本时,它仍然选择"io.confluent.ksql.avro_schemas"模式-

curl "http://localhost:8081/subjects/test-user-value/versions/1"

{"subject":"test user value","version":1,"id":4,"schema":"{"type:"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas">,"fields":[{"name":"registertime","type":["null","long"],"default":null},{"regionid","type":["null","string"],"default":null},{"name":"gender","type["null","string"],"default":null}]}"}

尝试使用Kafka-streams API-消费时出现以下错误

线程中出现异常"PageView-Users-Stream-Join-eg-1dc610a3-9d9-4c1e-b5eb-910e4bc74826-StreamThread-1"org.apache.kafka.streams.errors.StreamsException:反序列化异常处理程序设置为在出现反序列化错误时失败。如果你宁愿在反序列化错误,请设置default.deserialization.exception.handler。在org.apache.kafka.stream.processer.internals.RecordDeserializer.deserialize(RecordDeserialize.java:80)在org.apache.kafka.streams.processer.internals.RecordQueue.maybeUpdateTimestamp(RecordQueue.java:160)在org.apache.kafka.streams.processer.internals.RecordQueue.addRawRecords(RecordQueue.java:101)在org.apache.kafka.stream.processer.internals.PartitionGroup.addRawRecords(PartitionGroup.java:124)在org.apache.kafka.stream.processer.internals.StreamTask.addRecords(StreamTask.java:711)在org.apache.kafka.stream.processer.internals.StreamThread.addRecordsToTasks(StreamThread.java:995)在org.apache.kafka.stream.processer.internals.StreamThread.runOnce(StreamThread.java:833)在org.apache.kafka.stream.processer.internals.StreamThread.runLoop(StreamThread.java:777)在org.apache.kafka.streams.processer.internals.StreamThread.run(StreamThread.java:747)由:org.apache.kafka.commun.errors.SerializationException引起:反序列化id为4的Avro消息时出错,原因是:org.apache.kafka.commun.errors.SerializationException:找不到在为具体记录

答案在https://github.com/confluentinc/schema-registry/issues/980

Datagen始终将命名空间定义为io.confluent.ksql.avro_schemas。参见confluentinc/ksql#1906

现在还有其他方法可以将测试数据生成到Kafka中。

相关内容

  • 没有找到相关文章

最新更新