卡夫卡如何用春天寻找时间戳的偏移



我们使用spring-kafka来使用必须作为服务器发送事件(SSE(转发到前端的消息。

当用户登录时,她应该看到自上次会话以来错过的所有事件。

当前实现使用ConsumerSeekCallback,如中所述这个答案

然而,该回调不支持底层KafkaConsumer的offsetForTimes方法(KafkaConsumer#offsetForTimes(。

所以我必须使用seekToBeginning和时间戳过滤器,当有很多消息时,这会导致问题。。。

有没有其他方法可以只接收给定时间戳后的消息?也许是直接使用消费者的安全方式?

2.0引入了ConsumerAwareRebalanceListener(当前版本为2.2.2(

请参阅如何测试ConsumerwareReblanceListener?例如。

正如Gary Russel在上面指出的,ConsumerSeekCallback是遗留的,所以它是不可行的。。。我不会打开GitHub问题。。。

我终于能够实现我的目标:

当用户登录时,她应该看到她自从上次训练后就错过了。

通过处理ListenerContainerIdleEvent的EventListener中的所有新订阅,其中消费者作为事件数据的一部分可用:

@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void idleEventHandler(ListenerContainerIdleEvent event) {
// find new subscriptions
Collection<EventListenerSubscription> newSubscriptions = 
subscriptions.stream().filter(s -> s.isNew())
.collect(Collectors.toList());
if (!newSubscriptions.isEmpty()) {
// mark subscriptions a not new
newSubscriptions.forEach(s -> s.setNew(false));
// compute the oldest time stamp
OptionalLong oldestTimeStamp = 
newSubscriptions.stream()
.mapToLong(s -> s.getLastTimeStamp())
.reduce(Long::min);
if (oldestTimeStamp.isPresent()) {
// seek on topic for oldest time stamp
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(new TopicPartition(eventTopic, 0),
oldestTimeStamp.getAsLong());
Consumer<?, ?> consumer = event.getConsumer();
event.getConsumer().offsetsForTimes(timestampsToSearch).forEach((k, v) -> {
consumer.seek(k, v.offset());
});
}
}
}

我确定所有新订阅中最旧的时间戳,将这些订阅标记为非新订阅,并使用消费者对主题的查找来查找最早的时间戳。

为了获得容器空闲事件,必须在容器属性中配置空闲间隔,如下所述。

然后,KafkaListener将负责将旧事件发送给(以前是新的(订阅者:

@KafkaListener(id = "qux", topics = { "${app.event.topic}" }, errorHandler = "kafkaListenerErrorHandler")
public void receive(@Payload Event event, @Headers MessageHeaders headers) throws JsonProcessingException {
// collect the subscribers not marked as new
Collection<EventListenerSubscription> oldSubscriptions = 
subscriptions.stream().filter(s -> !s.isNew())
.collect(Collectors.toList());
for (EventListenerSubscription s : oldSubscriptions) {
if (s.getLastTimeStamp() < timestamp) {
s.addMessage(event, timestamp);
}
}
}

最新更新