如何在Kafka -connect中配置预先创建的Kafka主题作为sink ?



我正在尝试将postgreql DB(这是源)事件日志下沉到已经创建和配置的kafka融合云主题。

供参考,我使用quay.io/debezium/connect:1.9docker镜像作为我的kafka连接服务器,一个aws RDS postgresql数据库作为源,一个kafka融合云主题作为汇。

我知道在属性中有像CONNECT_TOPIC_CREATION_ENABLE=false=topic.creation.enable: false这样的参数可以阻止kafka主题的自动创建(因为它不允许在我的汇合云代理中),但是我在哪里指定主题目的地名称?

对于Debezium,主题是由服务器、数据库模式和表名的连接确定的。

例如,假设fulfillment是连接器配置中的逻辑服务器名,该连接器正在捕获PostgreSQL安装中的更改,该安装具有postgres数据库和inventory模式,该模式包含四个表:products,products_on_hand,customersorders。连接器将记录流到以下四个Kafka主题:

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

https://debezium.io/documentation/reference/stable/connectors/postgresql.html postgresql-topic-names

要进一步从连接器操作主题名称,可以使用RegexRouter转换。例如,如果您想要删除逻辑服务器名称"前缀";从每个主题中提取,或者只直接提取表名。您也可以使用静态值完全覆盖主题名称,如您所要求的。


CONNECT_TOPIC_CREATION_ENABLE与Debezium无直接关系。这是Kafka Connect的一个通用属性。

当源连接器配置了topic.creation.属性时,是否允许自动创建源连接器使用的主题

https://kafka.apache.org/documentation/connectconfigs_topic.creation.enable

最新更新