如何在Apache Kafka中使用Transaction Manager实现Kafka事务



我必须编写一个方法,在使用最新版本的Spring Boot(首选)或Java配置时,根据Kafka相关异常发生的失败,提交/发送所有Kafka生产者记录并回滚所有记录。

我计划在这个Spring Boot ref中遵循相同的方法,但无法找到示例代码。

public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}

新方法(最近添加)

public ProducerResult produceData(String topic, List<Data> data) {
data.forEach(
d -> {
final ProducerRecord<String, Data> record = createRecord(topic, d);
CompletableFuture<SendResult<Integer, Data>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex != null) {
//Need to roll back the transaction
return new ProducerResult(false);
}
});
}
);
//Need to commit the transaction
return new ProducerResult(true);
}

方法createRecord(topic, d)返回new ProducerRecord<>(topic, d),然后如何处理template.executeInTransaction?

您需要更详细地描述您的场景。

对于仅生产者的事务,只需在方法上使用普通的Spring@Transactional注释,方法内的所有发送都将参与事务。

也可以用KafkaTemplate.executeInTransaction();看到https://docs.spring.io/spring-kafka/reference/html/kafkatemplate-local-transactions

当方法退出时,事务将被提交(或者在抛出异常时回滚)。

对于consume->process->produce场景,情况类似,但不是使用@Transactional,侦听器容器将在调用侦听器方法之前启动事务。

编辑

下面是一个使用本地事务的例子;如果send或获取future的结果抛出异常,事务将被回滚;否则将被提交。

@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
List<String> data = List.of("foo", "bar");
return args -> {
boolean success = template.executeInTransaction(t -> {
List<CompletableFuture<SendResult<String, String>>> futures = new ArrayList<>();
data.forEach(item-> {
futures.add(t.send("so75910507", item));
});
futures.forEach(future -> {
try {
future.get(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
});
return true;
});
};
}
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
@Component
public class CustomProducerListener<K, V> implements ProducerListener<K, V> {
private final KafkaOperations<K, V> kafkaTemplate;
public CustomProducerListener(KafkaOperations<K, V> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
// Commit the record when it is successfully sent to Kafka
kafkaTemplate.sendOffsetsToTransaction(Collections.singletonMap(
new TopicPartition(producerRecord.topic(), producerRecord.partition()), 
new OffsetAndMetadata(recordMetadata.offset() + 1)), 
producerRecord.key());
}
@Override
public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
// Rollback the transaction when an exception occurs
kafkaTemplate.rollbackTransaction();
}
}

CustomProducerListener是ProducerListener接口的自定义实现,它有两个方法:onSuccess和onError。

在onSuccess方法中,我们通过调用带有当前记录的主题、分区和偏移量的sendOffsetsToTransaction方法来提交Kafka生产者记录。这确保了记录被持久化到Kafka代理,并且在发生故障时不会丢失。

在onError方法中,我们通过调用rollbackTransaction方法回滚事务。此方法将丢弃到目前为止已生成的所有记录,并重置事务状态。

要使用这个自定义ProducerListener,可以在Spring Boot Kafka配置类中配置它,如下所示:

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.ProducerListener;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix("my-transaction-prefix");
return producerFactory;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setProducerListener(producerListener());
return kafkaTemplate;
}

相关内容

  • 没有找到相关文章

最新更新