因此,我在MySQL源连接器中使用debeziumkey.field.name
将字段添加到主题中。
登录主题后,消息将显示在下面。
{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}:{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}
其中,关键是
{"id":20,"__PKtableowner":"reviewDB.review.search_user_02"}
值为
{"before":null,"after":{"search_user_all_shards.Value":{"id":20,"name":{"string":"oliver"},"email":{"string":"bob@abc.com"},"department":{"string":"sales"},"modified":"2021-01-06T09:27:02Z"}},"source":{"version":"1.0.2.Final","connector":"mysql","name":"reviewDB","ts_ms":1609925222000,"snapshot":{"string":"false"},"db":"review","table":{"string":"search_user_02"},"server_id":1,"gtid":null,"file":"binlog.000002","pos":14630,"row":0,"thread":{"long":13},"query":null},"op":"c","ts_ms":{"long":1609925222956}}
作为接收器hdfsSinkConnector的一部分,我需要获取消息密钥"__PKtableowner":"reviewDB.review.search_user_02
作为hdfs或hive中的列或字段的一部分。
我找到的唯一SMT是ValueToKey,但它似乎不适合我的用例,因为它是从值而不是从消息键获取的。我已经尝试过(InsertField、CreateKey、ExtractField等(几乎所有的转换,但没有成功。https://docs.confluent.io/platform/current/connect/transforms/valuetokey.html
我正在寻找一种KeyToValue类型的SMT,或者是否有其他解决方法。
以下是我的源和汇配置。来源:
{
"name": "REVIEW__MYSQL__search_user__source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.history.kafka.topic": "review.search_user_logs",
"database.history.consumer.max.block.ms": "3000",
"include.schema.changes": "false",
"database.history.consumer.session.timeout.ms": "30000",
"database.history.kafka.consumer.group": "compose-connect-group",
"snapshot.new.tables": "parallel",
"database.history.kafka.sasl.mechanism": "GSSAPI",
"database.whitelist": "review",
"database.history.producer.sasl.mechanism": "GSSAPI",
"database.user": "root",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"time.precision.mode": "connect",
"database.server.name": "reviewDB",
"database.port": "3306",
"database.history.consumer.heartbeat.interval.ms": "1000",
"min.row.count.to.stream.results": "0",
"database.hostname": "mysql",
"database.password": "example",
"database.history.consumer.sasl.mechanism": "GSSAPI",
"snapshot.mode": "when_needed",
"table.whitelist": "review.search_user_(.*)",
"transforms": "Reroute",
"transforms.Reroute.type": "io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex": "reviewDB.review.search_user_(.*)",
"transforms.Reroute.topic.replacement": "search_user_all_shards",
"transforms.Reroute.key.field.name": "__PKtableowner"
}
}
接收器
{ "name": "REVIEW__MYSQL__search_user__sink",
"config":
{
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"topics.dir": "/_incr_files",
"flush.size": 1,
"tasks.max": 1,
"timezone": "UTC",
"rotate.interval.ms": 5000,
"locale": "en",
"hadoop.home": "/etc/hadoop",
"logs.dir": "/_incr_files_wal",
"hive.integration": "false",
"partition.duration.ms": "20000",
"hadoop.conf.dir": "/etc/hadoop",
"topics": "search_user_all_shards",
"hdfs.url": "hdfs://namenode:9000",
"transforms": "unwrap,insertTopicOffset,insertTimeStamp",
"transforms.insertTimeStamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.insertTimeStamp.timestamp.field": "spdb_landing_timestamp",
"transforms.insertTopicOffset.offset.field": "spdb_topic_offset",
"transforms.insertTopicOffset.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"schema.compatibility": "NONE",
"path.format": "'partition'=YYYY-MM-dd-HH",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner"
}
}
由于您的密钥是Struct,我所知道的最好的方法是使用这种SMT,它可以有效地将密钥和值封装到一个新的嵌套值中
https://github.com/jcustenborder/kafka-connect-transform-archive
我最终创建了自己的。:(
https://github.com/Verdado/kafka-connect-custom-transforms