我在KafkaTemplate
中使用flush
方法来确保消息成功发送到代理。我有不同的主题和不同类型的信息。该服务的示例如下。根据消息<T>
的类型,将有几个服务bean。我计划使用在这些服务中共享的公共KafkaTemplate<String, Object>
。
我担心在不同的服务中刷新相同的共享KafkaTemplate<String, Object>
。如果我在一个服务bean中调用flush
,显然它将触发刷新所有类型的所有消息。总的来说,它是好的,不会打破我什么。但是从性能的角度来看:也许在不同的bean中使用不同的kafkaTemplate<String, T>
和冲洗不同的KafkaTemplate<String, T>
更好?
public class SenderServiceImpl<T> implements SenderService<T> {
// use one shared kafkaTemplate for any T beans
private final KafkaTemplate<String, Object> kafkaTemplate;
// ??? or maybe use multiple specific kafkaTemplate for specific T bean ???
// Might it get a performance gain ?
// private final KafkaTemplate<String, T> kafkaTemplate;
@Override
public List<T> sendMessages(String topicName, List<T> list) {
List<T> successList = new ArrayList<>();
list.forEach(value ->
kafkaTemplate.send(topicName, value)
.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, E> result) {
successList.add(value.getId());
log.debug("Successfully send message ...");
}
@Override
public void onFailure(Throwable exception) {
log.warn("Fail to send message ...");
}
}));
kafkaTemplate.flush();
return successList;
}
}
您通常不需要刷新模板(除非您有一个很大的linger.ms
并且想要抢占它)。
是的,它会冲洗所有东西。
相反,您可以等待由send返回的Future
来获得SendResult
,而不是向它添加侦听器来获得异步回调。