Redis to Postgres via Kafka



我使用的是kafka-connect-docker镜像。

我的源

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/   -d  '{"name": "redissource","config": {"connector.class": "com.redis.kafka.connect.RedisSourceConnector", "tasks.max": "1","topics": "mystream","redis.uri": "redis://virginia:virginia@172.18.1.41:1235","redis.cluster.enabled": "false",  "redis.stream.name":"mystream",   "key.converter":"org.apache.kafka.connect.storage.StringConverter",  "value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://schema-registry:8081" }}'

目标

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/   -d  '{"name": "redtopgsink","config": { "connector.class"  : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url":"jdbc:postgresql://navi1085_postgres_1:5432/postgres", "connection.user" :"postgres",  "connection.password" :"postgres_nave1085",  "topics"  :"mystream", "key.converter":"org.apache.kafka.connect.storage.StringConverter", "value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://schema-registry:8081" ,   "insert.mode" :"upsert", "schema.pattern":"public","auto.create"  :"true" ,"pk.mode"   :"record_key","pk.fields":"sensor_id",   "delete.enabled"  :"true","auto.evolve":"true"}}'

我收到下面的错误,请帮助解决这个错误。

{"name":"redtopgsink","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"connect:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.ntat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)ntat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)ntat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)ntat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)ntat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)ntat java.base/java.lang.Thread.run(Thread.java:829)nCaused by: org.apache.kafka.connect.errors.ConnectException: null (MAP) type doesn't have a mapping to the SQL database column typentat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getSqlType(GenericDatabaseDialect.java:1945)ntat io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getSqlType(PostgreSqlDatabaseDialect.java:332)ntat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnSpec(GenericDatabaseDialect.java:1861)ntat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.lambda$writeColumnsSpec$39(GenericDatabaseDialect.java:1850)ntat io.confluent.connect.jdbc.util.ExpressionBuilder.append(ExpressionBuilder.java:560)ntat io.confluent.connect.jdbc.util.ExpressionBuilder$BasicListBuilder.of(ExpressionBuilder.java:599)ntat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.writeColumnsSpec(GenericDatabaseDialect.java:1852)ntat io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.buildCreateTableStatement(GenericDatabaseDialect.java:1769)ntat io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:121)ntat io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:67)ntat io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:122)ntat io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74)ntat io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:584)nt... 10 moren"}],"type":"sink"}

Redis源连接器仅创建String、Bytes或Map架构(如错误中所述(

JDBC接收器需要具有typed&命名字段。映射值不能保证具有类型。

一种方法是使用KSQL,从Redis源主题创建一个Stream,然后通过选择Map键将数据输出到一个新的、结构化的Avro/JSON主题中,然后Postgres接收器可以读取Map键。

最新更新