Spring Cloud Stream Kafka Producer force producer.flush



我正在创建一个kafka streams/in-out类型的应用程序。示例代码如下

private MessageChannel output;
public void process(List<String> input) {
--somelogic
output.send()
}

根据我的理解,kafka在发送消息之前缓冲消息。现在,万一应用程序崩溃/容器崩溃,就有可能丢失缓冲的消息。

我们如何确保消息被实际发送出去

(类似KafkaTemplate.flush()这里?)

编辑根据Gary Russell的建议,我们应该在最后一条消息上设置FLUSH头。

后续问题-由于kafkaProducer.flush()方法的阻塞性质,最后一个send方法调用将成为阻塞调用,如果在kafka消息的发送过程中抛出异常(例如io异常/auth异常),它们是否会在相同的方法上下文中引发?

。在下面的代码中,kafka的sender异常会在catch块中被捕获吗?

public void process(List<String> input) {
--somelogic
try {
Message<?> message = //
message.setHeader(KafkaIntegrationHeaders.FLUSH, true);
output.send()
}
catch(Exception e) {
e.printstacktrace()
}
}

底层的KafkaProducerMessageHandler有一个属性:

/**
* Specify a SpEL expression that evaluates to a {@link Boolean} to determine whether
* the producer should be flushed after the send. Defaults to looking for a
* {@link Boolean} value in a {@link KafkaIntegrationHeaders#FLUSH} header; false if
* absent.
* @param flushExpression the {@link Expression}.
*/
public void setFlushExpression(Expression flushExpression) {

当前,绑定器不支持自定义此属性。

但是,如果您正在发送Message<?>s,则可以在批处理的最后一个消息中将KafkaIntegrationHeaders.FLUSH头设置为true

最新更新