我正在大规模运行kafka事务,下面是代码段。
producer.initTransaction();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>(producerTopic, element));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
producer.close();
canSendNext = false;
}catch (KafkaException e) {
producer.abortTransaction();
}
属性使用:
ProducerConfig。KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZERProducerConfig。VALUE_SERIALIZER_CLASS_CONFIG, BYTE_ARRAY_SERIALIZERProducerConfig。.toString TRANSACTIONAL_ID_CONFIG, UUID.randomUUID () ()bootstrap.servers = localhost: 9092ack =所有重试= 1partitioner.class = org.apache.kafka.clients.producer.RoundRobinPartitioner
当commitTransaction超时时,KafkaException的catch块运行并尝试中止事务。失败的错误是:**无法尝试操作abortTransaction
,因为之前对commitTransaction
的调用超时,必须重试**
如何处理提交事务超时场景
期望代码工作
根据文档:
注意这个方法将会抛出TimeoutException如果事务不能在max.block.ms过期前提交。此外,如果被中断,它将引发InterruptException。重试是安全的在任何一种情况下,但不可能尝试不同的操作(如abortTransaction),因为提交可能已经完成在完成的过程中。如果不重试,唯一的选择就是关闭生产者
这里的问题是kafka生产者超时了,它不知道kafka代理是否会完成交易。因此,它不能为您提供中止事务功能,因为即使在生产者超时之后,事务也有可能由代理提交。
你应该配置你的kafka生产者有足够大的max.block.ms
和retries
的数量来处理这样的场景(不知道为什么你已经配置你的重试为1)。理想情况下,你应该很少超时-像在网络中有一些问题,或者有一些实际的问题发生在kafka代理。
在这种情况下,您不可能知道上次事务是否成功。你不能做任何事情,只能关闭你的kafka生产者并创建一个新的。