什么是最简单的Spring Kafka@KafkaListener配置来消耗一组压缩主题中的所有记录



我在spring application.yaml文件中定义了几个压缩的Kafka主题(topic1topic2、…、topicN(的名称。我希望能够在启动时消耗每个主题分区上的所有记录。每个主题上的分区数量事先未知。

Spring Kafka 2.6.1官方文档建议,最简单的方法是实现PartitionFinder,并在SpEL表达式中使用它来动态查找主题的分区数量,然后在@TopicPartition注释的partitions属性中使用*通配符(请参阅@KafkaListener annotation文档中的Explicit Partition Assignment(:

@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"),
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}

由于我有几个主题,因此生成的代码非常冗长:

@KafkaListener(topicPartitions = {
@TopicPartition(
topic = "${topic1}",
partitions = "#{@finder.partitions('${topic1}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
@TopicPartition(
topic = "${topic2}",
partitions = "#{@finder.partitions('${topic2}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
),
// and many more @TopicPartitions...
@TopicPartition(
topic = "${topicN}",
partitions = "#{@finder.partitions('${topicN}')}",
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")
)
})
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
// process record
}

如何通过使用动态生成的@TopicPartions数组(我的N个主题中每个主题一个(配置@KafkaListener注释的topicPartitions属性,使这种重复配置更加简洁?

@KafkaListener目前不可能-请在GitHub上打开一个新的功能问题。

我能想到的唯一解决方案是从容器工厂以编程方式创建一个侦听器容器,并创建一个监听器适配器。如果你需要,我可以提供一个例子。

编辑

这里有一个例子:

@SpringBootApplication
public class So64022266Application {
public static void main(String[] args) {
SpringApplication.run(So64022266Application.class, args);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so64022266-1").partitions(10).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so64022266-2").partitions(10).replicas(1).build();
}
@Bean
ConcurrentMessageListenerContainer<String, String> container(@Value("${topics}") String[] topics,
PartitionFinder finder,
ConcurrentKafkaListenerContainerFactory<String, String> factory,
MyListener listener) throws Exception {
MethodKafkaListenerEndpoint<String, String> endpoint = endpoint(topics, finder, listener);
ConcurrentMessageListenerContainer<String, String> container = factory.createListenerContainer(endpoint);
container.getContainerProperties().setGroupId("someGroup");
return container;
}
@Bean
MethodKafkaListenerEndpoint<String, String> endpoint(String[] topics, PartitionFinder finder,
MyListener listener) throws NoSuchMethodException {
MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBean(listener);
endpoint.setMethod(MyListener.class.getDeclaredMethod("listen", String.class, String.class));
endpoint.setTopicPartitions(Arrays.stream(topics)
.flatMap(topic -> finder.partitions(topic))
.toArray(TopicPartitionOffset[]::new));
endpoint.setMessageHandlerMethodFactory(methodFactory());
return endpoint;
}
@Bean
DefaultMessageHandlerMethodFactory methodFactory() {
return new DefaultMessageHandlerMethodFactory();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
ConcurrentMessageListenerContainer<String, String> container) {
return args -> {
System.out.println(container.getAssignedPartitions());
template.send("so64022266-1", "key1", "foo");
template.send("so64022266-2", "key2", "bar");
};
}
}
@Component
class MyListener {
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
System.out.println(key + ":" + payload);
}
}
@Component
class PartitionFinder {
private final ConsumerFactory<String, String> consumerFactory;
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
public Stream<TopicPartitionOffset> partitions(String topic) {
System.out.println("+" + topic + "+");
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(part -> new TopicPartitionOffset(topic, part.partition(), 0L));
}
}
}
topics=so64022266-1, so64022266-2

如果您需要处理tombstone记录(null值(,我们需要增强处理程序工厂;我们目前没有公开框架的处理程序工厂。

最新更新