我如何测试我已经在我的春季启动服务中正确配置了ChainedKafkaTransactionManager ?<



我的spring启动服务需要在一个主题上消费kafka事件,做一些处理(包括用JPA写数据库),然后在一个新主题上产生一些事件。无论发生什么,我都不能在没有更新数据库的情况下发布事件,如果出现任何问题,那么我希望消费者的下一个轮询重试事件。我的处理逻辑包括数据库更新是幂等的,所以重试是好的

认为我已经通过使用ChainedKafkaTransactionManager实现了https://docs.spring.io/spring-kafka/reference/html/#exactly-once上描述的一次语义,如下所示:

@Bean
public ChainedKafkaTransactionManager chainedTransactionManager(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {
kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return new ChainedKafkaTransactionManager(kafka, jpa); 
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
ChainedKafkaTransactionManager chainedTransactionManager) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setTransactionManager(chainedTransactionManager);
return factory;
}

我的应用中相关的kafka配置。Yaml文件看起来像:

kafka:
...
consumer:
group-id: myGroupId
auto-offset-reset: earliest
properties:
isolation.level: read_committed
...
producer:
transaction-id-prefix: ${random.uuid}
...

因为提交顺序对我的应用程序至关重要,我想写一个集成测试来证明提交是按照预期的顺序发生的,如果在提交到kafka期间发生错误,那么原始事件将再次被消耗。然而,我正在努力寻找一种好的方法来导致db提交和kafka提交之间的失败。

有什么建议或其他方法吗?

感谢

您可以使用自定义ProducerFactory来返回MockProducer(由kafka-clients提供)

设置commitTransactionException,以便在KTM尝试提交事务时抛出。

编辑

下面是一个例子;它不使用链接TM,但这应该没有什么不同。

@SpringBootApplication
public class So66018178Application {
public static void main(String[] args) {
SpringApplication.run(So66018178Application.class, args);
}
@KafkaListener(id = "so66018178", topics = "so66018178")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.producer.transaction-id-prefix=tx-
spring.kafka.consumer.auto-offset-reset=earliest
@SpringBootTest(classes = { So66018178Application.class, So66018178ApplicationTests.Config.class })
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers")
class So66018178ApplicationTests {
@Autowired
EmbeddedKafkaBroker broker;
@Test
void kafkaCommitFails(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Config config)
throws InterruptedException {
registry.getListenerContainer("so66018178").stop();
AtomicReference<Exception> listenerException = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
((ConcurrentMessageListenerContainer<String, String>) registry.getListenerContainer("so66018178"))
.setAfterRollbackProcessor(new AfterRollbackProcessor<>() {
@Override
public void process(List<ConsumerRecord<String, String>> records, Consumer<String, String> consumer,
Exception exception, boolean recoverable) {
listenerException.set(exception);
latch.countDown();
}
});
registry.getListenerContainer("so66018178").start();
Map<String, Object> props = KafkaTestUtils.producerProps(this.broker);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
template.send("so66018178", "test");
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listenerException.get()).isInstanceOf(ListenerExecutionFailedException.class)
.hasCause(config.exception);
}
@Configuration
public static class Config {
RuntimeException exception = new RuntimeException("test");
@Bean
public ProducerFactory<Object, Object> pf() {
return new ProducerFactory<>() {
@Override
public Producer<Object, Object> createProducer() {
MockProducer<Object, Object> mockProducer = new MockProducer<>();
mockProducer.commitTransactionException = Config.this.exception;
return mockProducer;
}
@Override
public Producer<Object, Object> createProducer(String txIdPrefix) {
Producer<Object, Object> producer = createProducer();
producer.initTransactions();
return producer;
}
@Override
public boolean transactionCapable() {
return true;
}
};
}
}
}

不要再使用ChainedKafkaTransactionManager,它已被弃用。

根据文档:https://docs.spring.io/spring-kafka/reference/html/container-transaction-manager

" ChainedKafkaTransactionManager现在已弃用,从2.7版本;有关更多信息,请参阅javadocs中的超类ChainedTransactionManager。相反,在容器中使用KafkaTransactionManager来启动Kafka事务,并用@Transactional注释侦听器方法来启动另一个事务。">

在我的测试中,我试图在DB事务提交后模拟Producer中的异常,我只是在Kafka事件中保留强制字段为空(使用Avro模式),并且在第二个测试中,我在Kafka Admin的帮助下删除了用于生产的主题。然后我写了一些断言来验证Kafka Listener在重试时是否再次被调用。

最新更新