我有一个带有Kafka应用程序的Springboot,我的Kafka集群以docker撰写文件启动。但是,如果Kafka集群保持启动,但是Spring启动应用程序重新启动,它无法收到任何消息(功能正常工作)
属性文件:
server.port=18080
spring.kafka.bootstrap-servers=localhost:29092,localhost:39092
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.main.banner-mode=off
config.kafka.topic1=quickstart-events
config.kafka.retry.delay=10000
config.kafka.processing.interval=10000
config.kafka.producer.interval=10000
config.kafka.consumer.groupId=quickstart-events-group-id
KafkaConfiguration.java
@Slf4j
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public Serializer<Student> defaultJsonSerializer(ObjectMapper objectMapper) {
return new JsonSerializer<>(objectMapper);
}
@Bean
public NewTopic topicExample(@Value("${config.kafka.topic1}") String topic, @Value("${spring.kafka.bootstrap-servers}") String servers) {
int length = servers.split(",").length;
return TopicBuilder.name(topic)
.partitions(length)
.replicas(1)
.build();
}
}
StudentListener.java
@Service
@Slf4j
public class StudentListener extends AbstractBaseKafkaListener implements ApplicationListener<ApplicationReadyEvent> {
private final String listenerId = "kafka_consumerListener";
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
public StudentListener(List<MessageHandler> messageHandlers, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
super(messageHandlers);
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
}
@KafkaListener(topics = {"${config.kafka.topic1}"}, groupId = "${config.kafka.consumer.groupId}", autoStartup = "true")
public void onMessage(String message, MessageHeaders headers, Acknowledgment ack) {
super.onMessage(message, headers, ack);
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Application ready, start KafkaListener {}", listenerId);
//kafkaListenerEndpointRegistry.getListenerContainer(listenerId).start();
}
}
docker- composer .yml部分内容
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
zk-kafka-network:
ipv4_address: 10.5.0.6
通过设置这些属性并且不手动提交任何偏移量,应用程序将在每次启动时寻找主题的结束
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
在主题结束时没有数据可供消费,直到消费者运行而生产者发送数据。所以,如果一个消费者应用没有按照你期望的方式工作,那么先看看生产者应用。
如果生产者正在工作,那么您可以使用GetOffsetShell工具来验证主题中的偏移量是否在增加。
如果偏移量为增加,并且您希望捕获所有事件,而不仅仅是最新的,set
spring.kafka.consumer.auto-offset-reset=earliest
并返回消息
@KafkaListener(topics = {"${config.kafka.topic1}"}, groupId = "${config.kafka.consumer.groupId}", autoStartup = "true")
public void onMessage(String message, MessageHeaders headers, Acknowledgment ack) {
super.onMessage(message, headers, ack);
acknowledgment.acknowledge();
}
如果消费者重启,消费者组在主题的末尾,并且偏移量不增加,则没有数据可消费,但应用程序将继续轮询代理。