我想把一个表从MySQL数据库复制到PostgreSQL。我有KsqlDB,它充当流处理器。首先,我只想将一个简单的表从源的"inventory"数据库复制到sink数据库(PostgreSQL(。以下是库存数据库的结构:
mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
我已经登录到KsqlDB,并使用以下配置注册了一个源连接器
CREATE SOURCE CONNECTOR inventory_connector WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'debezium',
'database.password' = 'dbz',
'database.allowPublicKeyRetrieval' = 'true',
'database.server.id' = '223344',
'database.server.name' = 'dbserver',
'database.whitelist' = 'inventory',
'database.history.kafka.bootstrap.servers' = 'broker:9092',
'database.history.kafka.topic' = 'schema-changes.inventory',
'transforms' = 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.UnwrapFromEnvelope',
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable'= 'false',
'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'= 'false'
);
以下是创建的主题
ksql> LIST TOPICS;
Kafka Topic | Partitions | Partition Replicas
-----------------------------------------------------------------------
_ksql-connect-configs | 1 | 1
_ksql-connect-offsets | 25 | 1
_ksql-connect-statuses | 5 | 1
dbserver | 1 | 1
dbserver.inventory.addresses | 1 | 1
**dbserver.inventory.customers** | 1 | 1
dbserver.inventory.geom | 1 | 1
dbserver.inventory.orders | 1 | 1
dbserver.inventory.products | 1 | 1
dbserver.inventory.products_on_hand | 1 | 1
default_ksql_processing_log | 1 | 1
schema-changes.inventory | 1 | 1
-----------------------------------------------------------------------
现在,我只需要将"dbserver.inventure.customers"的内容复制到PostgreSQL数据库中。以下是数据的结构
ksql> PRINT 'dbserver.inventory.customers' FROM BEGINNING;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/08/29 02:39:20.772 Z, key: {"id":1001}, value: {"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1002}, value: {"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1003}, value: {"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1004}, value: {"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}, partition: 0
我已经尝试了接收器连接器的以下配置:
CREATE SINK CONNECTOR postgres_sink WITH (
'connector.class'= 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url'= 'jdbc:postgresql://postgres:5432/inventory',
'connection.user' = 'postgresuser',
'connection.password' = 'postgrespw',
'topics'= 'dbserver.inventory.customers',
'transforms'= 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones'= 'false',
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable'= 'false',
'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'= 'false',
'auto.create'= 'true',
'insert.mode'= 'upsert',
'auto.evolve' = 'true',
'table.name.format' = '${topic}',
'pk.mode' = 'record_key',
'pk.fields' = 'id',
'delete.enabled'= 'true'
);
它创建连接器,但显示以下错误:
ksqldb-server | Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'POSTGRES_SINK' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='dbserver.inventory.customers',partition=0,offset=0,timestamp=1661740760772) with a HashMap key and null key schema.
要将这些数据复制到PostgreSQL,接收器连接器的配置应该是什么?我也尝试过先在AVRO中创建一个流,然后使用AVRO密钥、值转换器,但没有成功。我认为这与使用正确的SMT有关,但我不确定。
我的最终目标是连接不同的流,然后将其存储在PostgreSQL中,作为实现CQRS架构的一部分。所以,如果有人能分享一个我可以在这种情况下使用的框架,那将非常有用。
正如错误所说,密钥必须是基元,而不是JSON对象,也不是Avro。
从显示的JSON中,您需要对密钥进行提取字段转换
transforms=getKey,unwrap
transforms.getKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.getKey.field=id
或者,您可以将源连接器更改为使用IntegerConverter,而不是密钥的JSONConverter
Debezium还有一篇关于这个用例的旧博客文章——https://debezium.io/blog/2017/09/25/streaming-to-another-database/