值模式必须是一个结构体



我正在发送一个嵌套json数据到Kafka消费者的PostgreSQL sink。我正在建设汇连接器,不幸的是我不能在源头改变数据。我想发送数据,因为它是没有任何转换使用kafka。

kafka connect显示这个错误:

[2023-01-04 22:58:15,227] ERROR WorkerSinkTask{id=Kafkapgsink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Value schema must be of type Struct (org.apache.kafka.connect.runtime.WorkerSinkTask:609)
org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:86)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:67)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:115)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

我的kafka连接器属性是=

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

接收器属性为name=Kafkapgsink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector task.max=100 connection.url=jdbc:postgresql://localhost:5432/fileintegrity connection.user=postgres connection.password=09900 insert.mode=insert auto.create=true auto.evolve=true table.name.format=oi pk.mode=record_key delete.enabled=true

问题就在这里

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

字符串没有键入的键值对(即struct)保证)

如果你想使用JSON,这里有更多的细节- https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

如你所见,schemas.enable只有JsonConverter的一个性质

最新更新