我第一次尝试Kafka,并使用AWS MSK设置Kafka集群。目标是将数据从MySQL服务器流式传输到Postgresql。我使用debezium MySQL连接器作为源,使用Confluent JDBC连接器作为接收器。
MySQL配置:
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.server.id": "1",
"tasks.max": "3",
"internal.key.converter.schemas.enable": "false",
"transforms.unwrap.add.source.fields": "ts_ms",
"key.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
在注册Mysql连接器之后,其状态为";运行";并捕获MySQL表中正在进行的更改,并以以下格式在消费者控制台中显示结果:
{"id":5,"created_at":1594910329000,"userid":"asldnl3r234mvnkk","amount":"B6Eg","wallet_type":"CDW"}
我的第一期:在表中"金额";列的类型为";十进制";并且包含数字值,但在消费者控制台中,为什么它显示为字母数字值?
对于作为目标数据库的Postgresql,我使用JDBC接收器连接器,配置如下:
"name": "postgres-connector-db08",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"topics": "mysql-cash.kafka_test.test",
"connection.url": "jdbc:postgresql://xxxxxx:5432/test?currentSchema=public",
"connection.user": "xxxxxx",
"connection.password": "xxxxxx",
"insert.mode": "upsert",
"auto.create": "true",
"auto.evolve": "true"
注册JDBC连接器后,当我检查状态时,它会给出一个错误:
{"name":"postgres-connector-db08","connector":{"state":"RUNNING","worker_id":"x.x.x.x:8083"},"tasks":[{"id":0,"state":"FAILED","worker_id":"x.x.x.x:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)nCaused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'postgres-connector-db08' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='mysql-cash.kafka_test.test',partition=0,offset=0,timestamp=1594909233389) with a HashMap value and null value schema.
io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresValue$2(RecordValidator.java:83)
io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
... 10 more
"}],"type":"sink"}
为什么会出现这个错误?我在水槽配置中遗漏了什么吗?
https://docs.confluent.io/kafka-connect-jdbc/current/sink-connector/index.html#data-映射
The sink connector requires knowledge of schemas, so you should use a suitable converter e.g. the Avro converter that comes with Schema Registry, or the JSON converter with schemas enabled.
由于JSON是普通的(没有架构(,并且连接器配置有"value.converter.schemas.enable": "false"
(禁用架构的JSON转换器(,因此Avro转换器应该使用schema Registry设置:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#applying-模式
关于第一个问题的回答"为什么用字母数字格式去掉小数">
小数的转换取决于小数。处理模式配置。
指定连接器应如何处理DECIMAL和NUMERIC列的值:precise(默认值(使用java.math.BigDecimal精确表示它们,这些值以二进制形式在更改事件中表示;或者double使用double值表示它们,这可能会导致精度损失,但使用起来要容易得多。string选项将值编码为格式化的字符串,这很容易使用,但会丢失有关实际类型的语义信息。
https://debezium.io/documentation/reference/0.10/connectors/mysql.html#decimal-值
如果没有合适的转换配置,您也可以创建自定义转换器。
https://debezium.io/documentation/reference/stable/development/converters.html
如果幸运的话,你可以找到一些开源转换器来解决这个问题。