通过Kafka模板发送消息的异常处理



我想知道在通过kafka模板生成kafka消息时,是否有一种方法可以捕获异常/可丢弃。

如果某个东西失败了,我看不出它会在哪里抛出KafkaException。在继续我的应用程序流之前,我需要查明消息是否已提交给Kafka。我知道ListenableFuture会记录一个失败,但我不知道如何在failure上捕获它。

public void sendToKafkaTopic(KafkaMessage data) {
ListenableFuture<SendResult<String, KafkaMessage>> future = kafkaTemplate.send(primaryKafkaTopic, data);
future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaMessage>>() {
@Override
public void onSuccess(SendResult<String, KafkaMessage> result) {
log.info("sent message='{}' with offset={}", data,
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("unable to send message='{}'", data, ex);
}
});
}

我正试图找到这样的东西:

public void sendToKafkaTopic(KafkaMessage data) {
try {
kafkaTemplate.send(primaryKafkaTopic, data);
} catch (RuntimeException err) {
log.error("unable to send message='{}'", data, ex);
throw new CustomException(err);
}
}
}

然后我才能继续我的应用程序流

如果您只执行Future.get(),没有什么问题。当异常发生在下游时,它将从这个阻塞get()抛出给您。

最新更新