Spring: ChainedKafkaTransactionManager与Jpa和Kafka是不是原子?



我们的基础设施必须管理两个spring引导应用程序之间的信息交换。部分信息基于Oracle数据库。我们使用Kafka来通知哪些信息必须由接收者管理。

(注意:我必须泛化代码)

我们有Kafka生产者,配置如下:

@Configuration
@Lazy(false)
public class KafkaConfig {
//...
@Bean(name="trManagerJpaKafka")
public ChainedKafkaTransactionManager<Object, Object> chainedTm(KafkaTransactionManager<String, String> ktm,
JpaTransactionManager jpaTransactionManager) {
return new ChainedKafkaTransactionManager<>(jpaTransactionManager,ktm);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(KafkaOperations<?, ?> template,
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory, ChainedKafkaTransactionManager<Object, Object> chainedTm) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

configurer.configure(factory, kafkaConsumerFactory);

//...
factory.setErrorHandler(new SeekToCurrentErrorHandler(recoverer,new FixedBackOff(0L, 2)));
factory.getContainerProperties().setTransactionManager(chainedTm);
return factory;
}
}

然后我们有一些服务产生Kafka消息,像这样

public class SomeService implements ISomeService {

@Override
@Transactional(transactionManager = "trManagerJpaKafka" , rollbackFor = { Exception.class })
public boolean createMessage(Long id, CreateRDM createRDM) {
...

Entity entity = new Entity();
entity.setSomething(something);
entity.setSomethingElse(somethingElse);

entity = entityRepository.save(entity);
JSONObject message = new JSONObject();

try {
message.put("id", entity.getId());
kafkaProducerService.sendMessage(message.toString());

...
}
...
}
public class SomeGenericKafkaService implements ISomeGenericKafkaService {
...
public void sendMessage(String data) {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(KafkaHeaders.TOPIC, someTopic);
headers.put(KafkaHeaders.MESSAGE_KEY, UUID.randomUUID().toString());

KafkaProducerCallback callback = new KafkaProducerCallback();

kafkaTemplate.send(new GenericMessage<String>(data, headers)).addCallback(callback);

if (callback.isError()) {
throw new KafkaException(callback.getThrowable());
}
}   
...
}

在消费者spring启动应用中,我们有时会遇到这样的问题:它消耗来自Kafka的消息,但有时DB上的记录并不存在,所以当我们试图访问DB记录时,我们会遇到异常…事务在生产者不是原子的Jpa和Kafka?在Jpa插入之前提交Kafka消息?

同时,作为紧急解决方案,为了管理从Oracle检索数据时出现的异常,我们在接收端添加了一些重试,如:

...
Optional<Entity> optionalEntity = entityRepository.findById(id);

if (!optionalEntity.isPresent()) {
for (int i = 0; i < kafkaDbMaxRetry; i++) {
Thread.sleep(kafkaMessageDelay);
optionalEntity = entityRepository.findById(id);
if (optionalEntity.isPresent()) break;
}
}

entity = optionalEntity.get();
...

如何调整生产者,避免这种对消费者的不良代码?

链式事务管理器只提供"尽力而为1阶段提交";Kafka不能参与JTA/XA事务;用这些不同的技术提供原子更新是不可能的。

参见https://www.infoworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html获取更多信息。

您必须使侦听器幂等——一种常见的模式是将主题/分区/偏移量与数据一起存储,以便您可以检查它是否已经被处理。

Kafka只按顺序处理来自一个分区的事件,但是如果事件最终出现在两个不同的分区中,则事件可能会被乱序处理。因此,您可以实现您的发送消息逻辑,例如包含您的db记录的主键作为分区键。

Kafka使用key来指定目标分区。默认策略是根据键的散列选择分区,如果键为空则使用轮询算法。

您可以实现一个自定义的org.apache.kafka.clients.producer.Partitioner来将消息映射到您需要的分区。类名必须设置为生成器的partitioner.class属性。

相关内容

最新更新