这是我的连接器配置:
curl -s -k -X POST http://***************:8083/connectors -H "Content-Type: application/json" -d '{
"name": "mysql-cdc-CUSTOMER_DETAILS-007",
"config": {
"tasks.max":"2",
"poll.interval.ms":"500",
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "dbnode",
"database.port": "3306",
"database.user": "**********",
"database.password": "###########",
"database.server.name": "dbnode",
"database.whitelist": "device_details",
"database.history.kafka.bootstrap.servers": "**********:9092",
"database.history.kafka.topic": "schema-changes.device_details",
"include.schema.changes":"true",
"table.whitelist":"device_details.tb_customermst",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://************:8081",
"internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable":"false",
"internal.value.converter.schemas.enable":"false"
}
}' | jq '.'
当使用ksql中的数据时,它显示如下:
ksql> print 'Device_Details.device_details.tb_customermst' from beginning;
Format:AVRO
5/2/20 2:08:34 PM IST, Struct{customerid=10001}, {"before": null, "after": {"customerid": 10001, "firstname": "Klara", "lastname": "Djokic", "emailid": "klara.djokic007@iillii.org", "mobilenumber": "+1 (480) 361-5311", "customertype": "Commercial", "emailverified": 1, "mobileverified": 1, "city": "Gilbert", "postcode": "85296", "address": "3426 E Elgin St", "latitude": 33.29840528, "longitude": -111.71571314, "UPDATE_TS": "2020-05-02T08:38:33Z"}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "Device_Details", "ts_ms": 1588408713000, "snapshot": "false", "db": "device_details", "table": "tb_customermst", "server_id": 1, "gtid": "98557612-65ba-11ea-8dc4-000c29bcb2b4:6", "file": "binlog.000044", "pos": 4417, "row": 0, "thread": 8, "query": null}, "op": "c", "ts_ms": 1588408713761, "transaction": null}
密钥是Struct{customerid=10001},我希望密钥为10001
我怎样才能做到这一点。。。
当使用ValueToKey和ExtractField$KeySMT连接日志时,会出现以下错误:
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:315)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
有人能告诉我怎样才能拿到10001的钥匙吗。
提前谢谢。
附言:我在用。。。。汇流平台5.4.0…MySql的Debezium连接器1.1.0
您就快到了。您需要单独使用ExtractField$Key
变换(即不使用ValueToKey
(来将值从结构中提升出来。
"transforms":"extractKeyfromStruct",
"transforms.extractKeyfromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyfromStruct.field":"customerid",