从MySQL数据库到PostgreSQL数据库的KsqlDB中JDBC接收器连接器的配置



我想把一个表从MySQL数据库复制到PostgreSQL。我有KsqlDB,它充当流处理器。首先,我只想将一个简单的表从源的"inventory"数据库复制到sink数据库(PostgreSQL(。以下是库存数据库的结构:

mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses           |
| customers           |
| geom                |
| orders              |
| products            |
| products_on_hand    |
+---------------------+

我已经登录到KsqlDB,并使用以下配置注册了一个源连接器

CREATE SOURCE CONNECTOR inventory_connector WITH (
'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
'database.hostname' = 'mysql',
'database.port' = '3306',
'database.user' = 'debezium',
'database.password' = 'dbz',
'database.allowPublicKeyRetrieval' = 'true',
'database.server.id' = '223344',
'database.server.name' = 'dbserver',
'database.whitelist' = 'inventory',
'database.history.kafka.bootstrap.servers' = 'broker:9092',
'database.history.kafka.topic' = 'schema-changes.inventory',
'transforms' = 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.UnwrapFromEnvelope',     
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable'= 'false',
'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'= 'false'
);

以下是创建的主题

ksql> LIST TOPICS;
Kafka Topic                         | Partitions | Partition Replicas
-----------------------------------------------------------------------
_ksql-connect-configs               | 1          | 1
_ksql-connect-offsets               | 25         | 1
_ksql-connect-statuses              | 5          | 1
dbserver                            | 1          | 1
dbserver.inventory.addresses        | 1          | 1
**dbserver.inventory.customers**        | 1          | 1
dbserver.inventory.geom             | 1          | 1
dbserver.inventory.orders           | 1          | 1
dbserver.inventory.products         | 1          | 1
dbserver.inventory.products_on_hand | 1          | 1
default_ksql_processing_log         | 1          | 1
schema-changes.inventory            | 1          | 1
-----------------------------------------------------------------------

现在,我只需要将"dbserver.inventure.customers"的内容复制到PostgreSQL数据库中。以下是数据的结构

ksql> PRINT 'dbserver.inventory.customers' FROM BEGINNING;
Key format: JSON or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2022/08/29 02:39:20.772 Z, key: {"id":1001}, value: {"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1002}, value: {"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1003}, value: {"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"}, partition: 0
rowtime: 2022/08/29 02:39:20.773 Z, key: {"id":1004}, value: {"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"}, partition: 0

我已经尝试了接收器连接器的以下配置:

CREATE SINK CONNECTOR postgres_sink WITH (
'connector.class'= 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url'= 'jdbc:postgresql://postgres:5432/inventory',
'connection.user' = 'postgresuser',
'connection.password' = 'postgrespw',
'topics'= 'dbserver.inventory.customers',
'transforms'= 'unwrap',
'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState',
'transforms.unwrap.drop.tombstones'= 'false',    
'key.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'key.converter.schemas.enable'= 'false',
'value.converter'= 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable'= 'false',
'auto.create'= 'true',
'insert.mode'= 'upsert',
'auto.evolve' = 'true',
'table.name.format' = '${topic}',
'pk.mode'   = 'record_key',
'pk.fields' =  'id',
'delete.enabled'= 'true'
);

它创建连接器,但显示以下错误:

ksqldb-server      | Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'POSTGRES_SINK' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='dbserver.inventory.customers',partition=0,offset=0,timestamp=1661740760772) with a HashMap key and null key schema.

要将这些数据复制到PostgreSQL,接收器连接器的配置应该是什么?我也尝试过先在AVRO中创建一个流,然后使用AVRO密钥、值转换器,但没有成功。我认为这与使用正确的SMT有关,但我不确定。

我的最终目标是连接不同的流,然后将其存储在PostgreSQL中,作为实现CQRS架构的一部分。所以,如果有人能分享一个我可以在这种情况下使用的框架,那将非常有用。

正如错误所说,密钥必须是基元,而不是JSON对象,也不是Avro。

从显示的JSON中,您需要对密钥进行提取字段转换

transforms=getKey,unwrap
transforms.getKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.getKey.field=id

或者,您可以将源连接器更改为使用IntegerConverter,而不是密钥的JSONConverter

Debezium还有一篇关于这个用例的旧博客文章——https://debezium.io/blog/2017/09/25/streaming-to-another-database/

相关内容

  • 没有找到相关文章

最新更新