Eventhub流没有捕获模式不匹配



我们正试图实现badRecordsPath时,我们正在从事件中读取事件,作为一个例子,试图让它工作,我已经放入模式,应该失败的事件:

eventStreamDF = (spark.readStream
.format("eventhubs")
.options(**eventHubsConf)
.option("badRecordsPath", "/tmp/badRecordsPath/test1")
.schema(badSchema)
.load()
) 

然而,这永远不会失败,总是读取事件,这是读流对数据块的事件的行为吗?目前的解决方法是根据我们自己的模式来检查interschema。

EventHubs中的数据模式是固定的(请参阅文档)(与Kafka相同)-实际有效载荷始终编码为名称为body的二进制字段,并且由开发人员根据"联系人"解码该二进制有效载荷。在数据的生产者和该数据的消费者之间。因此,即使您指定了模式和badRecordsPath选项,它们也不会被使用。

你将需要实现一些功能,将解码数据从JSON,或其他东西,这将例如返回null如果数据被打破,然后你将有一个空值的过滤器分割成两个子流-为好&糟糕的数据。

最新更新