我需要一些关于如何让kafka在目标表中复制源表结构的建议。让我解释一下…
源数据库:SQL Server来源表:
CREATE TABLE dbo.PEOPLE(
ID NUMERIC(10) NOT NULL PRIMARY KEY,
FIRST_NAME VARCHAR(10),
LAST_NAME varchar(10),
AGE NUMERIC(3)
)
目标数据库:PostgreSQL
Kafka水槽连接器:
name=pg-sink-connector_people
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
tasks.max=4
topics=myserver.dbo.PEOPLE
connection.url=jdbc:postgresql://localhost:5432/kafkadb
connection.user=postgres
connection.password=mypassword
insert.mode=upsert
pk.mode=record_key
pk.fields=ID
table.name.format=PEOPLE
auto.create=true
offset.storage.file.filename=C:/kafka_2.13-2.7.0/tmp/connect.offsets
bootstrap.servers=localhost:9092
plugin.path=C:/kafka_2.13-2.7.0/plugins
transforms=flatten
transforms.flatten.type=org.apache.kafka.connect.transforms.Flatten$Value
transforms.flatten.delimiter=_
auto.evolve=true
当我运行上面的连接器时,Kafka在PostgreSQL上创建了一个目标表,如下所示:
CREATE TABLE public."PEOPLE" (
"before_ID" int8 NULL,
“before_FIRST_NAME" text NULL,
“before_LAST_NAME" text NULL,
"before_AGE" int8 NULL,
"after_ID" int8 NULL,
"after_FIRST_NAME" text NULL,
"after_LAST_NAME" text NULL,
"after_AGE" int8 NULL,
source_version text NOT NULL,
source_connector text NOT NULL,
source_name text NOT NULL,
source_ts_ms int8 NOT NULL,
source_snapshot text NULL DEFAULT 'false'::text,
source_db text NOT NULL,
source_schema text NOT NULL,
source_table text NOT NULL,
source_change_lsn text NULL,
source_commit_lsn text NULL,
source_event_serial_no int8 NULL,
op text NOT NULL,
ts_ms int8 NULL,
transaction_id text NULL,
transaction_total_order int8 NULL,
transaction_data_collection_order int8 NULL,
"ID" text NOT NULL,
CONSTRAINT "PEOPLE_pkey" PRIMARY KEY ("ID")
);
重点是我不需要这些before/after字段或其他创建的字段。复制完全相同的源表结构的最佳方法是什么?
谢谢!
当您使用Debezium时,它包括关于它所捕获的更改记录以及之前和之后状态的元数据。所有这些数据都以嵌套对象的形式出现。
目前,您只是使用Flatten
单一消息转换来压平所有这些字段。如果您不需要额外的字段,您可以使用Debezium提供的ExtractNewRecordState
SMT来实现这一目的。使用而不是Flatten
:
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState