无法使用JDBC kafka-sink-connector将kafka主题数据写入postgres DB



我有一个kafka主题,我们产生avro记录,模式如下所示。

{
"type": "record",
"name": "testValue",
"namespace": "com.test.testValue",
"fields": [
{
"name": "A",
"type": "string"
},
{
"name": "B",
"type": "string"
},
{
"name": "C",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 8,
"scale": 2
}
},
{
"name": "D",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "E",
"type": [
{
"type": "record",
"name": "F",
"fields": [
{
"name": "G",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 8,
"scale": 2
}
}
]
},
{
"type": "record",
"name": "H",
"fields": [
{
"name": "dummy",
"type": "boolean",
"default": true
}
]
},
{
"type": "record",
"name": "I",
"fields": [
{
"name": "J",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 8,
"scale": 2
}
}
]
}
]
},
{
"name": "K",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "L",
"type": "boolean"
}
]
}

连接器的配置如下:

{
"name" : "test-sink-database",
"config" : {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://database_url/postgres",
"topics": "test",
"connection.user": "postgres",
"connection.password": "password",
"table.name.format": "test_table",
"auto.create": "true",
"schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"name": "test-sink-database",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"insert.mode":"insert"
}

低于错误。

Caused by: org.apache.kafka.connect.errors.ConnectException: io.confluent.connect.avro.Union (STRUCT) type doesn't have a mapping to the SQL database column type

全部加

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: io.confluent.connect.avro.Union (STRUCT) type doesn't have a mapping to the SQL database column type
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1727)
at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:215)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1643)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$33(GenericDatabaseDialect.java:1632)
at io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:558)
at io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:597)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1634)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1557)
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:91)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:61)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:121)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)

我认为这是由于字段Erecord类型。错误状态为(STRUCT) type doesn't have a mapping to the SQL database column type。您的E定义如下

"name": "E",
"type": [
{
"type": "record",
"name": "F",
"fields": [
{
"name": "G",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 8,
"scale": 2
}
}
]
},
{
"type": "record",
"name": "H",
"fields": [
{
"name": "dummy",
"type": "boolean",
"default": true
}
]
},
{
"type": "record",
"name": "I",
"fields": [
{
"name": "J",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 8,
"scale": 2
}
}
]
}
]
}

JDBC接收器不能处理嵌套良好的数据结构。您可以尝试使用CAST Single Message转换将其转换为字符串,并检查它是否正确地推送到数据库中。另一种方法是使用另一个SMT(如

)美化该值。
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value"