星期四快乐!我一直在尝试在Debezium中创建连接器Postgres连接器,但只有当表已经存在于我的MySQL实例中时,我才能捕获更改,这并不理想。因为那样我就必须用Python编写一个脚本来处理这样的事件,而且使用已经存在的东西可能比重新发明轮子更容易。我希望能够在实际连接器中捕获DDL。我偶然发现了这篇博客文章。https://debezium.io/blog/2017/09/25/streaming-to-another-database/我把它放在了我的本地设置上,这很好,但唯一的问题是我想朝着相反的方向发展。(我可以捕获新记录、删除的记录和更新的记录,如果不存在,它还会创建新表和新列(。我想从postgres进行流式传输,并将连接器插入到mysql中的目标数据库中。我尝试分别切换jdbc源和接收器连接器,但没有将新记录从postgres插入mysql。我似乎可以在所有地方找到从mysql插入postgres的人,但不能从另一个方向插入。这是我设置的GitHub目录,用于让mysql-kafka-postgres正常工作。https://github.com/debezium/debezium-examples/tree/main/unwrap-smt
我试着走另一条路,但当我开始说";由于kafka[org.apache.kafka.clients.ClientUtils]的DNS解析失败,无法从bootstrap.servers解析服务器kafka:9092;这是我的源json和接收json。
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "postgres.public.customers",
"connection.url": "jdbc:mysql://mysql:3306/inventory",
"connection.user": "debezium",
"connection.password": "dbz",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.fields": "id",
"pk.mode": "record_key"
}
}
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"topic.prefix": "psql",
"mode": "bulk",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgresuser",
"database.password": "postgrespw",
"database.dbname": "inventory",
"table.include.list": "public.customers",
"slot.name": "test_slot",
"plugin.name": "wal2json",
"database.server.name": "psql",
"tombstones.on.delete": "true",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
"key.converter.basic.auth.credentials.source": "USER_INFO",
"key.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://[APACHE_KAFKA_HOST]:[SCHEMA_REGISTRY_PORT]",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USER]:[SCHEMA_REGISTRY_PASSWORD]"
}
}
在我关注的博客上,其他一切都保持不变。欢迎任何帮助。
我认为这里有两个不同的问题:
-
如何处理Mysql中不存在的列。JDBC接收连接器应该有一个名为
auto.create
的标志,如果设置为true
,则允许连接器在不存在表的情况下创建表(auto.evolve
也允许表进化( -
PG->卡夫卡->Mysql是可能的,你可以在这里找到我前段时间写的一个例子。这些例子使用了适用于PostgreSQL的Aiven和适用于Apache Kafka的Aiven,但您应该能够调整连接器以在任何类型的PG和Kafka中工作。
如果知道你的PG->卡夫卡->MySQL管道停止工作。
免责声明:我为Aiven 工作
这里有一个例子。我相信它与发送到mysql时使用的接收器非常相似。