我们使用spring-kafka来使用必须作为服务器发送事件(SSE(转发到前端的消息。
当用户登录时,她应该看到自上次会话以来错过的所有事件。
当前实现使用ConsumerSeekCallback,如中所述这个答案
然而,该回调不支持底层KafkaConsumer的offsetForTimes方法(KafkaConsumer#offsetForTimes(。
所以我必须使用seekToBeginning和时间戳过滤器,当有很多消息时,这会导致问题。。。
有没有其他方法可以只接收给定时间戳后的消息?也许是直接使用消费者的安全方式?
2.0引入了ConsumerAwareRebalanceListener
(当前版本为2.2.2(
请参阅如何测试ConsumerwareReblanceListener?例如。
正如Gary Russel在上面指出的,ConsumerSeekCallback是遗留的,所以它是不可行的。。。我不会打开GitHub问题。。。
我终于能够实现我的目标:
当用户登录时,她应该看到她自从上次训练后就错过了。
通过处理ListenerContainerIdleEvent的EventListener中的所有新订阅,其中消费者作为事件数据的一部分可用:
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void idleEventHandler(ListenerContainerIdleEvent event) {
// find new subscriptions
Collection<EventListenerSubscription> newSubscriptions =
subscriptions.stream().filter(s -> s.isNew())
.collect(Collectors.toList());
if (!newSubscriptions.isEmpty()) {
// mark subscriptions a not new
newSubscriptions.forEach(s -> s.setNew(false));
// compute the oldest time stamp
OptionalLong oldestTimeStamp =
newSubscriptions.stream()
.mapToLong(s -> s.getLastTimeStamp())
.reduce(Long::min);
if (oldestTimeStamp.isPresent()) {
// seek on topic for oldest time stamp
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(new TopicPartition(eventTopic, 0),
oldestTimeStamp.getAsLong());
Consumer<?, ?> consumer = event.getConsumer();
event.getConsumer().offsetsForTimes(timestampsToSearch).forEach((k, v) -> {
consumer.seek(k, v.offset());
});
}
}
}
我确定所有新订阅中最旧的时间戳,将这些订阅标记为非新订阅,并使用消费者对主题的查找来查找最早的时间戳。
为了获得容器空闲事件,必须在容器属性中配置空闲间隔,如下所述。
然后,KafkaListener将负责将旧事件发送给(以前是新的(订阅者:
@KafkaListener(id = "qux", topics = { "${app.event.topic}" }, errorHandler = "kafkaListenerErrorHandler")
public void receive(@Payload Event event, @Headers MessageHeaders headers) throws JsonProcessingException {
// collect the subscribers not marked as new
Collection<EventListenerSubscription> oldSubscriptions =
subscriptions.stream().filter(s -> !s.isNew())
.collect(Collectors.toList());
for (EventListenerSubscription s : oldSubscriptions) {
if (s.getLastTimeStamp() < timestamp) {
s.addMessage(event, timestamp);
}
}
}