我们使用Kafka Connect从Oracle数据源获取数据,并以AVRO格式写入HDFS。在Kafka模式注册表中,其中一个数据源的模式如下:
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "ID",
"type": [
"null",
{
"type": "bytes",
"scale": 0,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "0"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
}....
}
这意味着ID列的精度为64。当我尝试这些AVRO文件时,它抛出:
由:org.apache.spark.sql.AnalysisException引起:decimal只能支撑精度高达38;在org.apache.spark.sql.types.DecimalType.(DecimalType.scala:51(在org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:60(在org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105(在org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82(在org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81(在scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234(在scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234(位于scala.collection.Iterator$class.foreach(Iterator.scala:891(位于的scala.collection.AbstractIterator.foreach(Iterator.scala:1334(scala.collection.IterabaleLike$class.foreach(IterableLike.scala:72(位于scala.collection.AbstractIterable.foreach(Iterable.scala:54(位于scala.collection.TraversableLike$class.map(TraversableLik.scala:234(位于scala.collection.AbstractTraversable.map(Traversable.scala:104(在org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81(在org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46(在org.apache.spark.sql.avro.AvroFileFormat.inderSchema(AvroFileForm.scala:93(在org.apache.spark.sql.exexecution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180(在org.apache.spark.sql.exexecution.datasources.DataSource$$anonfun$6.apply(DataSource.scala:180(
我读取AVO文件的代码片段是:
def readSchemaOfAvroPartition(avroLocation: String, partitionColumn: String, partitionValue: String): StructType = {
sparkSession.read.format(AVRO)
.load(s"${avroLocation}/${partitionColumn}=${partitionValue}")
.schema
}
根据Oracle文档,最大精度应为38。https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT313
如何强制Kafka Connect将此模式值注册为38而不是64?
这不一定是Kafka连接器中的错误,而是Kafka的一般工作方式。在许多数据库中,NUMERIC或DECIMAL列是用精度和小数位数定义的。数据库默认基于数据库技术的精度和规模。
Kafka连接没有一个很好的方法来跨数据库和其他非数据库文件系统处理这一问题。
更多详细信息可以在这里找到-
https://github.com/confluentinc/kafka-connect-jdbc/issues/563
https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector#bytes-十进制数字
https://gist.github.com/rmoff/7bb46a0b6d27982a5fb7a103bb7c95b9#file-oracle md