我使用debezium ui repo来测试debezium mysql cdc功能,消息可以正常流式传输
到kafka中,创建mysql连接的请求体如下:
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "dbzui-db-mysql",
"database.port": "3306",
"database.user": "mysqluser",
"database.password": "mysql",
"database.server.id": "184054",
"database.server.name": "inventory-connector-mysql",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "dbzui-kafka:9092",
"database.history.kafka.topic": "dbhistory.inventory"
}
}
然后我需要将kafka消息放入我的团队使用的数据仓库雪花中。我创建了一个雪花接收器连接器来接收它,请求体如下:
{
"name": "kafka2-04",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": 1,
"topics": "inventory-connector-mysql.inventory.orders",
"snowflake.topic2table.map": "inventory-connector-mysql.inventory.orders:tbl_orders",
"snowflake.url.name": "**.snowflakecomputing.com",
"snowflake.user.name": "kafka_connector_user_1",
"snowflake.private.key": "*******",
"snowflake.private.key.passphrase": "",
"snowflake.database.name": "kafka_db",
"snowflake.schema.name": "kafka_schema",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
"value.converter.schemas.enable":"true"
}
}
但在它运行后,数据汇到我的雪花中是这样的:雪花中的数据,雪花表中的模式与mysql表不同。是我的接收器连接器配置不正确,还是无法使用SnowflakeSinkConnector接收debezium生成的kafka数据。
这是Snowflake中的默认行为,并在此处进行了记录:
Kafka连接器加载的每个Snowflake表都有一个由两个VARIANT列组成的模式:
RECORD_CONTENT. This contains the Kafka message.
RECORD_METADATA. This contains metadata about the message, for example, the topic from which the message was read.
如果Snowflake创建了表,则该表仅包含这两列。如果用户为Kafka连接器创建了要添加行的表,那么该表可以包含这两列以上的列(任何额外的列都必须允许NULL值,因为连接器中的数据不包括这些列的值(