Kafka Connect不适用于主题策略



上下文

我编码了几个小的Kafka Connect连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与模式注册表集成在一起,因此数据是用Avro序列化的。

我使用Landoop 提供的快速数据开发Docker镜像将它们部署到本地Kafka环境中

基本设置工作并每秒生成一条记录的消息

然而,我想改变主题名称策略。默认的生成两个主题:

  • ${topic}-key
  • ${topic}-value

根据我的用例,我需要生成具有不同模式的事件,这些事件最终会出现在同一主题上。因此,我需要的主题名称是:

  • ${topic}-${keyRecordName}
  • ${topic}-${valueRecordName}

根据文档,我的需求符合TopicRecordNameStrategy

我试过什么

我创建了用于发送要连接的值的avroData对象:

class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData 
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}

然后将其用于创建CCD_ 6响应对象

文档指出,为了在Kafka Connect中使用Schema Registry,我必须在连接器配置中设置一些属性。因此,当我创建它时,我会添加它们:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题

连接器似乎忽略了这些属性,并继续使用旧的${topic}-key${topic}-value主题。

问题

Kafka Connect应该支持不同的主题策略。我通过编写自己版本的AvroConverter并硬编码主题策略是我需要的策略,设法解决了这个问题。然而,这看起来不是一个好方法,而且在尝试使用Sink Kafka连接器消费数据时也会带来问题。我复制了主题,所以有一个旧名称的版本(${topic}-key(,它可以

将主题策略指定给Kafka Connect的正确设置是什么?

您缺少key.convertervalue.converter前缀,以便将配置传递给converter。所以不是:

key.subject.name.strategy
value.subject.name.strategy

你想要:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

来源https://docs.confluent.io/current/connect/managing/configuring.html:

要将配置参数传递给键和值转换器,请在它们前面加上key.converter.value.converter.,就像定义默认转换器时在辅助配置中一样。请注意,这些仅在key.convertervalue.converter属性中指定了相应的转换器配置时使用。

相关内容

  • 没有找到相关文章

最新更新