我们用Kafka和Zookeeper建立了一个环境。我们需要通过Kafka Connect将数据从PostgreSQL数据库发送到MongoDB数据库。我们能够执行配置,在初始化连接器时,数据最初会正确迁移。
然而,我们需要的是,当在连接器配置中添加新的列或表时,受影响的记录会再次发送给我们的使用者进行处理,以便在MongoDB数据库中再次对其进行操作和更新。有没有一种方法可以做到这一点,而不必再次迁移所有记录?我们需要分阶段进行迁移,因为迁移需要大量数据。
以下是我们的配置示例:
{
"name": "migration_connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "host.docker.internal",
"database.port": "5432",
"plugin.name": "pgoutput",
"database.user": "postgres_user",
"database.password": "postgres_pasword",
"database.dbname" : "database_source",
"database.server.name": "server_name",
"schema.include": "public",
"column.include.list": "public.customer.id_customer,public.customer.name,public.table_B.id,public.table_c.id ... and so on",
"table.include.list": "public.customer,public.table_B,public.table_C",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"heartbeat.interval.ms": 5000,
"heartbeat.action.query": "INSERT INTO public.heartbeat_kafka_connect (id, ts) VALUES (1, NOW()) ON CONFLICT(id) DO UPDATE SET ts=EXCLUDED.ts;"
}}
示例:
我们必须从表"中添加另一列;客户";我们需要再次接收所有客户的注册以进行处理。当我们更新连接器配置时,不会再次发送寄存器。
我们能够弄清楚如何让设置工作。
我们使用临时快照功能。我们根据以下脚本(PostgresSql(在源数据库中创建一个信号表:
CREATE TABLE public.debezium_signal (id VARCHAR(42) PRIMARY KEY, type VARCHAR(32) NULL, data VARCHAR(2048) NULL);
我们调整了连接器的配置以包含标签";signal.data.collection";指向我们创建的信号表。
"signal.data.collection": "public.debezium_signal"
我们还在标签"中添加了表名;table.include.list";由于我们使用该配置来告诉我们希望kafka连接迁移哪些列,因此我们不得不在标记"中添加信号表的列;column.include.list;
"column.include.list": "public.customer.id,(other columns...),public.debezium_signal.id,public.debezium_signal.type,public.debezium_signal.data",
"table.include.list": "public.customer,public.debezium_signal",
之后,我们只是在signals表中添加了一条记录,指向我们希望kafka连接的表来重新快照记录。
INSERT INTO debezium_signal (id, type, data) VALUES (gen_random_uuid(), 'execute-snapshot', '{"data-collections": ["public.customer"]}');