我们正在尝试使用Debezium从许多Postgresql数据库中收集更改。这个想法是创建一个主题,其分区数量等于数据库数量——每个数据库都有自己的分区,因为事件的顺序很重要。我们设法使用主题路由将事件重新路由到单个主题,但为了能够按数据库对事件进行分区,我需要正确设置消息密钥。
Qestion:有没有一种方法可以将kafka消息密钥设置为等于源数据库名称?
我的想法:
- 也许有一种方法可以为每个连接器配置全局设置消息密钥
- 可以在消息中找到数据库名称,但它是嵌套属性
payload.source.name
。没有找到从嵌套属性中提取值的方法
有什么想法吗?
提前谢谢!
您需要编写/找到一个Connect转换,该转换可以提取嵌套字段并设置消息密钥,或者如果您不介意在Kafka主题中复制数据,您可以使用Kafka Streams/KsqlDB等来执行同样的操作。
总的来说,我认为一个主题+每个数据库一个分区对于消费者的可伸缩性来说不是一个好的设计。当然,它会保持秩序,但简单地用一个分区为每个数据库创建一个主题并不需要太多开销。然后让消费者使用regex模式读取所有主题,而不需要在一个主题中分配给特定的/所有的分区。