在Debezium Postgresql中将kafka消息密钥设置为源数据库名称



我们正在尝试使用Debezium从许多Postgresql数据库中收集更改。这个想法是创建一个主题,其分区数量等于数据库数量——每个数据库都有自己的分区,因为事件的顺序很重要。我们设法使用主题路由将事件重新路由到单个主题,但为了能够按数据库对事件进行分区,我需要正确设置消息密钥。

Qestion:有没有一种方法可以将kafka消息密钥设置为等于源数据库名称?

我的想法:

  1. 也许有一种方法可以为每个连接器配置全局设置消息密钥
  2. 可以在消息中找到数据库名称,但它是嵌套属性payload.source.name。没有找到从嵌套属性中提取值的方法

有什么想法吗?

提前谢谢!

您需要编写/找到一个Connect转换,该转换可以提取嵌套字段并设置消息密钥,或者如果您不介意在Kafka主题中复制数据,您可以使用Kafka Streams/KsqlDB等来执行同样的操作。

总的来说,我认为一个主题+每个数据库一个分区对于消费者的可伸缩性来说不是一个好的设计。当然,它会保持秩序,但简单地用一个分区为每个数据库创建一个主题并不需要太多开销。然后让消费者使用regex模式读取所有主题,而不需要在一个主题中分配给特定的/所有的分区。

相关内容

  • 没有找到相关文章

最新更新