如何部署处于暂停模式的 Kafka 使用者,直到我发出信号开始使用消息



我正在使用 spring-kafka 2.2.8 并试图了解是否有选项可以部署处于暂停模式的 kafka 消费者,直到我发出信号开始使用消息。请指教。

我在下面的帖子中看到,我们可以暂停并启动消费者,但我需要消费者在部署时处于暂停模式。 如何使用 Spring-Kafka 暂停和恢复@KafkaListener

@KafkaListener(id = "foo", ..., autoStartup = "false")

准备好后使用KafkaListenerEndpointRegistry启动它

registry.getListenerContainer("foo").start();

在暂停模式下启动它没有多大意义,但你可以这样做......

@SpringBootApplication
public class So62329274Application {
public static void main(String[] args) {
SpringApplication.run(So62329274Application.class, args);
}

@KafkaListener(id = "so62329274", topics = "so62329274", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62329274").partitions(1).replicas(1).build();
}

@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {
return args -> {
template.send("so62329274", "foo");
registry.getListenerContainer("so62329274").pause();
registry.getListenerContainer("so62329274").start();
System.in.read();
registry.getListenerContainer("so62329274").resume();
};
}
}

分配分区后,您将看到如下日志消息:

由于重新平衡,Kafka 恢复了暂停的消费者;消费者再次暂停,因此初始 poll(( 永远不会返回任何记录

相关内容

最新更新