我有一个@KafkaListener方法来获取主题中的所有消息,但我只获得@Scheduled方法工作的每个间隔时间的一条消息。我怎样才能一次从主题中获得所有消息?
这是我的类;
@Slf4j
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
@Autowired
private SimpMessagingTemplate webSocket;
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private BrokerProducerService brokerProducerService;
@Autowired
private GlobalConfig globalConfig;
@Override
@KafkaListener(id = "snapshotOfOutagesId", topics = Constants.KAFKA_TOPIC, groupId = "snapshotOfOutages", autoStartup = "false")
public void consumeToSnapshot(ConsumerRecord<String, OutageDTO> cr, @Payload String content) {
log.info("Received content from Kafka notification to notification-snapshot topic: {}", content);
MessageListenerContainer listenerContainer = registry.getListenerContainer("snapshotOfOutagesId");
JSONObject jsonObject= new JSONObject(content);
Map<String, Object> outageMap = jsonToMap(jsonObject);
brokerProducerService.sendMessage(globalConfig.getTopicProperties().getSnapshotTopicName(),
outageMap.get("outageId").toString(), toJson(outageMap));
listenerContainer.stop();
}
@Scheduled(initialDelayString = "${scheduler.kafka.snapshot.monitoring}",fixedRateString = "${scheduler.kafka.snapshot.monitoring}")
private void consumeWithScheduler() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("snapshotOfOutagesId");
if (listenerContainer != null){
listenerContainer.start();
}
}
我的kafka属性在application.yml;
kafka:
streams:
common:
configs:
"[bootstrap.servers]": 192.168.99.100:9092
"[client.id]": event
"[producer.id]": event-producer
"[max.poll.interval.ms]": 300000
"[group.max.session.timeout.ms]": 300000
"[session.timeout.ms]": 200000
"[auto.commit.interval.ms]": 1000
"[auto.offset.reset]": latest
"[group.id]": event-consumer-group
"[max.poll.records]": 1
还有我的kafkconfiguration类;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(globalConfig.getBrokerProperties().getConfigs());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
你现在正在做的是:
- 创建监听器,但还没有启动(
autoStartup = false
) - 当调度的作业启动时,启动容器(将开始使用来自主题的第一条消息)
- 当第一个消息被使用时,停止容器(导致不再使用任何消息)
所以你所描述的行为并不奇怪。
@KafkaListener
不需要计划任务来开始使用消息。我认为您可以删除autoStartup = false
并删除计划作业,之后侦听器将逐个消耗主题上的所有消息,并等待新的消息出现在主题上。
还有一些我注意到的事情:
这些属性是用于Kafka流的,对于常规的Spring Kafka,你需要这样的属性:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
...etc
还有:为什么使用@Payload String content
而不是已经序列化的cr.getVaue()
?