每个Http请求有多个Kafka Producer实例



我有一个可以由多个用户同时调用的rest端点。这个rest端点调用一个Transactional Kafka Producer。我所理解的是,如果我们使用Transaction,我就不能同时使用同一个KafkaProducer实例。

如何有效地为每个HTTP请求创建一个新的Kafka Producer实例?

//Kafka Transaction enabled
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "prod-1-" );
@Service
public class ProducerService {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
public void postMessage(final MyUser message) {
// wrapping the send method in a transaction
this.kafkaTemplate.executeInTransaction(kafkaTemplate -> {
kafkaTemplate.send("custom", null, message);
}
}

请参阅DefaultKafkaProducerFactory的javadocs。它为生产者发起的事务维护生产者的缓存。

/**
* The {@link ProducerFactory} implementation for a {@code singleton} shared {@link Producer} instance.
* <p>
* This implementation will return the same {@link Producer} instance (if transactions are
* not enabled) for the provided {@link Map} {@code configs} and optional {@link Serializer}
* implementations on each {@link #createProducer()} invocation.
...
* Setting {@link #setTransactionIdPrefix(String)} enables transactions; in which case, a
* cache of producers is maintained; closing a producer returns it to the cache. The
* producers are closed and the cache is cleared when the factory is destroyed, the
* application context stopped, or the {@link #reset()} method is called.
...
*/

最新更新