我有如下所示的debezium源连接器和jdbc-sink连接器:
{
"name": "smartdevsignupconnector111",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"database.hostname": "mysql1",
"database.port": "3306",
"database.user": "clusterAdmin",
"database.password": "RUNSman001",
"database.server.id": "184055",
"database.server.name": "smartdevdbserver1",
"database.include.list": "signup_db",
"schema.history.internal.kafka.topic": "schema-changes.signup_db",
"schema.history.internal.kafka.bootstrap.servers": "kafka1:9092",
"table.include.list": "signup_db.users",
"column.exclude.list": "signup_db.users.fullName, signup_db.users.address, signup_db.users.phoneNo, signup_db.users.gender, signup_db.users.userRole, signup_db.users.reason_for_inactive, signup_db.users.firstvisit, signup_db.users.last_changed_PW, signup_db.users.regDate",
"snapshot.mode": "when_needed",
"topic.creation.enable": "true",
"topic.prefix": "smartdevdbserver1",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex": "smartdevdbserver1.signup_db.(.*)",
"transforms.dropTopicPrefix.replacement": "$1",
"include.schema.changes": "true"
}
}
{
"name": "resetpassword-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"topics": "users",
"connection.url": "jdbc:mysql://rpwd_mysql:3306/rpwd_db",
"connection.user": "rpwd_user",
"connection.password": "*RUNSman001*",
"table.name.format": "users",
"fields.whitelist": "id,email,password,User_status,auth_token",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
两者都工作得很好cdc从mysql数据库表到kafka主题和下沉数据从kafka主题到另一个mysql数据库表,但问题是删除数据不工作。
从源数据中删除数据不会删除kafka主题中的记录,也不会删除目标表中的记录。
请有人告诉我我错过了什么。
编辑:
来自源表的示例记录根本没有改变,看起来仍然一样。请见以下记录:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":1,"email":"testing1@firstclicklimited.com","password":"$2a$10$Vah8yMoB3jopzwreKwHRKuH59UVFGXwxCSP0hQs99wcWEnbqLp7cO","User_status":"ACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":2,"email":"testing2@firstclicklimited.com","password":"$2a$10$mqT6BtiLybFxuBpWcuiFt.M2IuL5O3bq6pB1CMUxqdyncMeVjKLNC","User_status":"ACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":4,"email":"testing4@firstclicklimited.com","password":"$2a$10$9dsA1QCGNb31cloPeu3uq.w25rRzepS3mb04GcKZjIEOrl.ImcqDO","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":5,"email":"testing5@firstclicklimited.com","password":"$2a$10$52nsmqYYIit4.Ztmu6h4geAlvH1VkeauWIDu83i8FmOQohzFost7C","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":6,"email":"testing6@firstclicklimited.com","password":"$2a$10$fy4OT0W7pmV2pwOcRb4m8eHSWs8tA8ZvWOTrdK85SpmcHJBLy9lm6","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":7,"email":"testing7@firstclicklimited.com","password":"$2a$10$IvhBY9iVZkpRvg6M.LnQ4OO3c2cKUUjZbYMnII7ZWe.t0iYCK2L5u","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":29,"email":"testing11@firstclicklimited.com","password":"$2a$10$0KBAnV9AHhDeAe8jg4wkWeqrIE1hDvqFMjvsl9IR/6zmWPtb1C3M2","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":36,"email":"testing12@firstclicklimited.com","password":"$2a$10$1IiE/vWLz6YwtJLpXR/InewQbEfsMc6VimiOzI6yR2WOmhzxURMsm","User_status":"ACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":50,"email":"testing14@firstclicklimited.com","password":"$2a$10$/6x9dP7MUKzWIbHluQG/0ebdA7tYO1qm5ky9X5YIVRSvDqQKuWdp.","User_status":"ACTIVE","auth_token":null}}
我从源表中删除的记录是带有email == testing14@firstclicklimited.com的最后一条记录
mysql> delete from users where email = 'testing14@firstclicklimited.com';
Query OK, 1 row affected (0.11 sec)
使用——property print.key=true:
PS C:UsersEBITIDesktopSign-up> docker exec kafka1 /kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic users --from-beginning --property print.key=true
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":1}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":1,"email":"testing1@firstclicklimited.com","password":"$2a$10$Vah8yMoB3jopzwreKwHRKuH59UVFGXwxCSP0hQs99wcWEnbqLp7cO","User_status":"ACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":2}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":2,"email":"testing2@firstclicklimited.com","password":"$2a$10$mqT6BtiLybFxuBpWcuiFt.M2IuL5O3bq6pB1CMUxqdyncMeVjKLNC","User_status":"ACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":4}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":4,"email":"testing4@firstclicklimited.com","password":"$2a$10$9dsA1QCGNb31cloPeu3uq.w25rRzepS3mb04GcKZjIEOrl.ImcqDO","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":5}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":5,"email":"testing5@firstclicklimited.com","password":"$2a$10$52nsmqYYIit4.Ztmu6h4geAlvH1VkeauWIDu83i8FmOQohzFost7C","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":6}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":6,"email":"testing6@firstclicklimited.com","password":"$2a$10$fy4OT0W7pmV2pwOcRb4m8eHSWs8tA8ZvWOTrdK85SpmcHJBLy9lm6","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":7}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":7,"email":"testing7@firstclicklimited.com","password":"$2a$10$IvhBY9iVZkpRvg6M.LnQ4OO3c2cKUUjZbYMnII7ZWe.t0iYCK2L5u","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":29}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":29,"email":"testing11@firstclicklimited.com","password":"$2a$10$0KBAnV9AHhDeAe8jg4wkWeqrIE1hDvqFMjvsl9IR/6zmWPtb1C3M2","User_status":"INACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":36}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":36,"email":"testing12@firstclicklimited.com","password":"$2a$10$1IiE/vWLz6YwtJLpXR/InewQbEfsMc6VimiOzI6yR2WOmhzxURMsm","User_status":"ACTIVE","auth_token":null}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Key"},"payload":{"id":50}} {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"email"},{"type":"string","optional":false,"field":"password"},{"type":"string","optional":false,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"ACTIVE,INACTIVE"},"default":"INACTIVE","field":"User_status"},{"type":"string","optional":true,"field":"auth_token"}],"optional":false,"name":"smartdevdbserver1.signup_db.users.Value"},"payload":{"id":50,"email":"testing14@firstclicklimited.com","password":"$2a$10$/6x9dP7MUKzWIbHluQG/0ebdA7tYO1qm5ky9X5YIVRSvDqQKuWdp.","User_status":"ACTIVE","auth_token":null}}
好了,我找到问题了。有SMT "ExtractNewRecordState"在源和接收连接器上,它应该只在其中一个连接器上(在我的情况下,我选择执行SMT "ExtractNewRecordState"在源连接器上)。然后transform .unwrap.drop.tombstones应该设置为false或者delete.handling。模式应该设置为none(你可以同时设置这两个,没有问题)以下是源和接收连接器现在的样子:
{
"name": "smartdevsignupconnector111",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"database.hostname": "mysql1",
"database.port": "3306",
"database.user": "clusterAdmin",
"database.password": "RUNSman001",
"database.server.id": "184055",
"database.server.name": "smartdevdbserver1",
"database.include.list": "signup_db",
"schema.history.internal.kafka.topic": "schema-changes.signup_db",
"schema.history.internal.kafka.bootstrap.servers": "kafka1:9092",
"table.include.list": "signup_db.users",
"column.exclude.list": "signup_db.users.fullName, signup_db.users.address, signup_db.users.phoneNo, signup_db.users.gender, signup_db.users.userRole, signup_db.users.reason_for_inactive, signup_db.users.firstvisit, signup_db.users.last_changed_PW, signup_db.users.regDate",
"snapshot.mode": "when_needed",
"topic.creation.enable": "true",
"topic.prefix": "smartdevdbserver1",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"delete.handling.mode": "none",
"transforms.dropTopicPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex": "smartdevdbserver1.signup_db.(.*)",
"transforms.dropTopicPrefix.replacement": "$1",
"include.schema.changes": "true"
}
}
{
"name": "resetpassword-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",
"topics": "users",
"connection.url": "jdbc:mysql://rpwd_mysql:3306/rpwd_db",
"connection.user": "rpwd_user",
"connection.password": "*RUNSman001*",
"table.name.format": "users",
"fields.whitelist": "id,email,password,User_status,auth_token",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}