我使用的是kafka-connect-docker镜像。
我的源
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "redissource","config": {"connector.class": "com.redis.kafka.connect.RedisSourceConnector", "tasks.max": "1","topics": "mystream","redis.uri": "redis://virginia:virginia@172.18.1.41:1235","redis.cluster.enabled": "false", "redis.stream.name":"mystream", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://schema-registry:8081" }}'
目标
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name": "redtopgsink","config": { "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url":"jdbc:postgresql://navi1085_postgres_1:5432/postgres", "connection.user" :"postgres", "connection.password" :"postgres_nave1085", "topics" :"mystream", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://schema-registry:8081" , "insert.mode" :"upsert", "schema.pattern":"public","auto.create" :"true" ,"pk.mode" :"record_key","pk.fields":"sensor_id", "delete.enabled" :"true","auto.evolve":"true"}}'
我收到下面的错误,请帮助解决这个错误。
{"name":"redtopgsink","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.ntat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)ntat java.base/java.lang.Thread.run(Thread.java:829)nCaused by: org.apache.kafka.connect.errors.ConnectException: null (MAP) type doesn't have a mapping to the SQL database column typentat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1945)ntat io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:332)ntat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1861)ntat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$39(GenericDatabaseDialect.java:1850)ntat io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:560)ntat io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:599)ntat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1852)ntat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1769)ntat io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:121)ntat io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67)ntat io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:122)ntat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)ntat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)nt... 10 moren"}],"type":"sink"}
Redis源连接器仅创建String、Bytes或Map架构(如错误中所述(
JDBC接收器需要具有typed&命名字段。映射值不能保证具有类型。
一种方法是使用KSQL,从Redis源主题创建一个Stream,然后通过选择Map键将数据输出到一个新的、结构化的Avro/JSON主题中,然后Postgres接收器可以读取Map键。