@KafkaListener获取特定kafka主题的所有消息



我有一个@KafkaListener方法来获取主题中的所有消息,但我只获得@Scheduled方法工作的每个间隔时间的一条消息。我怎样才能一次从主题中获得所有消息?

这是我的类;

@Slf4j
@Service
public class KafkaConsumerServiceImpl implements KafkaConsumerService {
@Autowired
private SimpMessagingTemplate webSocket;
@Autowired
private KafkaListenerEndpointRegistry registry;
@Autowired
private BrokerProducerService brokerProducerService;
@Autowired
private GlobalConfig globalConfig;
@Override
@KafkaListener(id = "snapshotOfOutagesId", topics = Constants.KAFKA_TOPIC, groupId = "snapshotOfOutages", autoStartup = "false")
public void consumeToSnapshot(ConsumerRecord<String, OutageDTO> cr, @Payload String content) {
log.info("Received content from Kafka notification to notification-snapshot topic: {}", content);
MessageListenerContainer listenerContainer = registry.getListenerContainer("snapshotOfOutagesId");
JSONObject jsonObject= new JSONObject(content);
Map<String, Object> outageMap = jsonToMap(jsonObject);
brokerProducerService.sendMessage(globalConfig.getTopicProperties().getSnapshotTopicName(),
outageMap.get("outageId").toString(), toJson(outageMap));
listenerContainer.stop();
}
@Scheduled(initialDelayString = "${scheduler.kafka.snapshot.monitoring}",fixedRateString = "${scheduler.kafka.snapshot.monitoring}")
private void consumeWithScheduler() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("snapshotOfOutagesId");
if (listenerContainer != null){
listenerContainer.start();
}
}

我的kafka属性在application.yml;

kafka:
streams:
common:
configs:
"[bootstrap.servers]": 192.168.99.100:9092
"[client.id]": event
"[producer.id]": event-producer
"[max.poll.interval.ms]": 300000
"[group.max.session.timeout.ms]": 300000
"[session.timeout.ms]": 200000
"[auto.commit.interval.ms]": 1000
"[auto.offset.reset]": latest
"[group.id]": event-consumer-group
"[max.poll.records]": 1

还有我的kafkconfiguration类;

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(globalConfig.getBrokerProperties().getConfigs());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new StringDeserializer());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

你现在正在做的是:

  1. 创建监听器,但还没有启动(autoStartup = false)
  2. 当调度的作业启动时,启动容器(将开始使用来自主题的第一条消息)
  3. 当第一个消息被使用时,停止容器(导致不再使用任何消息)

所以你所描述的行为并不奇怪。

@KafkaListener不需要计划任务来开始使用消息。我认为您可以删除autoStartup = false并删除计划作业,之后侦听器将逐个消耗主题上的所有消息,并等待新的消息出现在主题上。

还有一些我注意到的事情:

这些属性是用于Kafka流的,对于常规的Spring Kafka,你需要这样的属性:

spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
auto-offset-reset: earliest
...etc

还有:为什么使用@Payload String content而不是已经序列化的cr.getVaue()?

最新更新