我使用JDBC sink连接器将数据从kafka主题加载到postgres。这是我的配置
curl --location --request PUT 'http://localhost:8083/connectors/customer_sink_1/config'
--header 'Content-Type: application/json'
--data-raw '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://localhost:5432/postgres",
"connection.user":"user",
"connection.password":"passwd",
"tasks.max" : "1",
"topics":"table_name_same_as_topic_name",
"insert.mode":"insert",
"key.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"quote.sql.identifiers":"never",
"errors.tolerance":"all",
"errors.deadletterqueue.topic.name":"failed_records",
"errors.deadletterqueue.topic.replication.factor":"1",
"errors.log.enable":"true",
"errors.deadletterqueue.context.headers.enable":"true",
"reporter.bootstrap.servers":"localhost:9092",
"reporter.result.topic.name":"success-responses",
"reporter.result.topic.replication.factor":"1",
"reporter.error.topic.name":"error-responses",
"reporter.error.topic.replication.factor":"1"
}'
我从apache kafka下载kafka在windows上,使用.bat
文件使用服务。
我能够将失败的记录发送到其他主题,但是当我尝试使用kafka-consumer从命令行消费它时,无法看到标头,但可以看到失败的数据/记录。
根据文档,Kafka Connect Concepts
You can then use the **kcat** (formerly kafkacat) Utility to view the record header and determine why the record failed. Errors are also sent to **Connect Reporter**.
所以我尝试连接Reporter,但是没有创建成功响应和错误响应主题。
我怎么能看到失败的记录头没有kcat ????这可能吗?
根据你的Kafka版本,你可以使用console consumer
kafka-console-consumer ... --property print.headers=true
如果不能使用kcat