上下文
我编码了几个小的Kafka Connect连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与模式注册表集成在一起,因此数据是用Avro序列化的。
我使用Landoop 提供的快速数据开发Docker镜像将它们部署到本地Kafka环境中
基本设置工作并每秒生成一条记录的消息
然而,我想改变主题名称策略。默认的生成两个主题:
${topic}-key
${topic}-value
根据我的用例,我需要生成具有不同模式的事件,这些事件最终会出现在同一主题上。因此,我需要的主题名称是:
${topic}-${keyRecordName}
${topic}-${valueRecordName}
根据文档,我的需求符合TopicRecordNameStrategy
我试过什么
我创建了用于发送要连接的值的avroData
对象:
class SampleSourceConnectorTask : SourceTask() {
private lateinit var avroData: AvroData
override fun start(props: Map<String, String>) {
[...]
avroData = AvroData(AvroDataConfig(props))
}
然后将其用于创建CCD_ 6响应对象
文档指出,为了在Kafka Connect中使用Schema Registry,我必须在连接器配置中设置一些属性。因此,当我创建它时,我会添加它们:
name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
问题
连接器似乎忽略了这些属性,并继续使用旧的${topic}-key
和${topic}-value
主题。
问题
Kafka Connect应该支持不同的主题策略。我通过编写自己版本的AvroConverter
并硬编码主题策略是我需要的策略,设法解决了这个问题。然而,这看起来不是一个好方法,而且在尝试使用Sink Kafka连接器消费数据时也会带来问题。我复制了主题,所以有一个旧名称的版本(${topic}-key
(,它可以
将主题策略指定给Kafka Connect的正确设置是什么?
您缺少key.converter
和value.converter
前缀,以便将配置传递给converter。所以不是:
key.subject.name.strategy
value.subject.name.strategy
你想要:
key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy
来源https://docs.confluent.io/current/connect/managing/configuring.html:
要将配置参数传递给键和值转换器,请在它们前面加上
key.converter.
或value.converter.
,就像定义默认转换器时在辅助配置中一样。请注意,这些仅在key.converter
或value.converter
属性中指定了相应的转换器配置时使用。