我正试图将我们的流式管道传递到Table API,除了一个字段之外,我几乎做到了。
我从Kafka主题中读取CSV数据,然后进行一些转换,并将转换后的数据以Avro格式发送到主题。Avro模式有一些复杂的字段,对于一个特定的字段,我遇到了麻烦。我可以将我的数据以avro格式发送到kafka主题,但无法使用我的模式将其读回。如果删除该列,我就可以写入数据并将其读回。我可以使用表API读回所有内容。
字段的Avro模式:
{
"name": "problem_field",
"type": [
"null", {
"type": "array",
"items": {
"type": "record",
"name": "ProblemField",
"doc": "Description of the problem field",
"fields": [
{
"name": "proc",
"type": "string"
}, {
"name": "success",
"type": "boolean"
}
]
}
}
],
"doc": "some doc here",
"default": null
},
字段的逻辑类型:
ARRAY<ROW<`proc` STRING NOT NULL, `success` BOOLEAN NOT NULL> NOT NULL>
我的表模式:
.column("problem_field", DataTypes.ARRAY(
ROW(FIELD("proc", STRING().notNull()), FIELD("success", BOOLEAN().notNull()))
)
我收到的错误:
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -25
我的消费者:
val schema = new Schema.Parser().parse(fs.open(new Path(jobArgs("schema"))))
val deser = AvroDeserializationSchema.forGeneric(schema)
val kafka_source = setSourceStream(deser)
val stream = env.addSource(kafka_source)(deser.getProducedType)
val config = OutputFileConfig.builder().withPartPrefix("source-part").withPartSuffix(".json").build()
writeOutputAsText(stream, s"${jobArgs("extracts_path")}", config).setParallelism(1).name("JSON")
env.execute("Consumer")
我认为我的问题是数据类型提取,而我正在编写不同于Avro模式编码的字段的输出格式。
如果有人能为我指明解决问题的方向?
我想我找到了问题的答案。我忘了在我的阵列中为ROW指定NOT NULL
。
解决方案:
.column("problem_field", DataTypes.ARRAY(
ROW(FIELD("proc", STRING().notNull()), FIELD("success", BOOLEAN().notNull()).notNull())
)
现在我可以阅读的所有内容