Kafka 生产者抛出错误 尝试从状态IN_TRANSACTION到状态IN_TRANSACTION的转换无效



我的 kafka 生产者抛出错误"尝试从状态 IN_TRANSACTION 到状态 IN_TRANSACTION 的无效转换"。这就是我想要实现的目标——

KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();
//transaction 1
producer.beginTransaction();
//send some messages
producer.commitTransaction();
//transaction 2
producer.beginTransaction(); //here it throws an exception "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION".
//send some messages
producer.commitTransaction();
producer.close();

如果我在开始事务 2 之前再次调用producer.initTransactions();,它会抛出异常"尝试从状态 READY 到状态初始化的无效转换"。

我做错了什么?

producer.initTransactions((;

这会将代理的生产者注册为可以使用事务的生产者,通过其 transactional.id 和序列号或纪元来识别它。反过来,代理将使用它们将任何操作预先写入事务日志。

因此,代理将从该日志中删除属于具有相同事务 ID 和更早纪元的生产者的任何操作,假设它们来自已失效的事务。

https://www.baeldung.com/kafka-exactly-once

我在初始化生产者 -> 后在构造函数本身中调用 initTransactions

Properties properties = getProperties();
kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<>(properties);
kafkaProducer.initTransactions();
return kafkaProducer;

每次为每笔交易创建新的生产者即可。

KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();
//transaction 1
producer.beginTransaction();
//send some messages
producer.commitTransaction();
producer.close();
KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();
//transaction 2
producer.beginTransaction(); //here it throws an exception "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION".
//send some messages
producer.commitTransaction();
producer.close();

最新更新