使用onErrorResume来处理使用Reactor Kafka发布到Kafka的有问题的有效载荷



我正在使用reactor kafka发送kafka消息,并接收和处理它们。在接收kakfa有效载荷时,我会进行一些反序列化,如果出现异常,我只想记录该有效载荷(通过保存到mongo(,然后继续接收其他有效载荷。

为此,我使用以下方法-

@EventListener(ApplicationStartedEvent.class)
public void kafkaReceiving() {
for(Flux<ReceiverRecord<String, Object>> flux: kafkaService.getFluxReceives()) {
flux.delayUntil(//some function to do something)
.doOnNext(r -> r.receiverOffset().acknowledge())
.onErrorResume(this::handleException()) // here I'll just save to mongo 
.subscribe();
}
}

private Publisher<? extends ReceiverRecord<String,Object>> handleException(object ex) {
// save to mongo
return Flux.empty();
}

在这里,我希望每当我在接收负载时遇到异常时,onErrorResume都会捕捉到它并登录到mongo,然后当我发送到kafka队列时,我应该可以继续接收更多的消息。然而,我看到在异常之后,即使调用了onErrorResume方法,但我无法再处理发送到Kaffa主题的消息。我在这里可能遗漏了什么?

如果您需要优雅地处理错误,可以在delayUntil:中添加onErrorResume

flux
.delayUntil(r -> {
return process(r)
.onErrorReturn(e -> saveToMongo(r));
});
.doOnNext(r -> r.receiverOffset().acknowledge())
.subscribe();

反应运算符将错误视为终端信号,如果您的内部逻辑(delayUntil内部(抛出错误,delayUntil将终止序列,而delayUntil之后的onErrorReturn将不会使其继续处理Kafka中的事件。

正如@bsideup所提到的,我最终没有从反序列化程序抛出异常,因为kafka无法提交该记录的偏移量,而且没有干净的方法可以忽略该记录并继续消耗记录,因为我们没有记录的偏移信息(因为它格式错误(。因此,即使我试图使用反应性错误运算符忽略记录,轮询也会获取相同的记录,然后消费者就会陷入

最新更新