无法使用Spingboot应用程序配置带有Camel Kafka的事务



我目前正在创建一个Springboot应用程序,该应用程序将使用Kafka主题中的消息,处理后将写回另一个主题。我正在使用CAMEL进行集成。

My Route looks like this :
onException(IllegalArgumentException.class).maximumRedeliveries(4); 

from("kafka:CDC-GEXPUAT-CUSTOMER")
.id("CamelRouteCustomer_1")
.**transacted**()
.choice()
.when(simple("${body} contains 'GEXPUAT.CUSTOMER'" ))
.unmarshal().json(JsonLibrary.Jackson, CustomerWrapper.class)
.process(customerProcessor)
.otherwise()
.log("${body}")                 
.end()
.to("seda:aggregate_1");

当我在Route中使用.transacted((时,会出现以下错误:org.apache.camel.NoSuchBea异常:在类型为PlatformTransactionManager的注册表中找不到任何bean

因此,我现在尝试创建一个配置类来定义TransactionManager

@Configuration
public class CommonBean {

@Bean
SpringTransactionPolicy springTransactionPolicy() throws Exception {
SpringTransactionPolicy txRequired = new SpringTransactionPolicy();
txRequired.setTransactionManager(transactionManager());
txRequired.setPropagationBehaviorName("PROPAGATION_REQUIRED");
return txRequired;
}
@Bean
public DefaultKafkaProducerFactory<byte[], byte[]> producerFactory() {
DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory = new DefaultKafkaProducerFactory<byte[], byte[]>(
kafkaConfigs());
// enable transaction manager
defaultKafkaProducerFactory.setTransactionIdPrefix(transactionIdPrefix);
return defaultKafkaProducerFactory;
}

@Bean
@Primary
public ChainedKafkaTransactionManager<byte[], byte[]> transactionManager() throws Exception {
return new ChainedKafkaTransactionManager<>(kafkaTransactionManager());
}
@Bean
public PlatformTransactionManager kafkaTransactionManager() {
KafkaTransactionManager<byte[], byte[]> kafkaTransactionManager = new KafkaTransactionManager<>(producerFactory);
kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
kafkaTransactionManager.setRollbackOnCommitFailure(true);
return kafkaTransactionManager;
}
}

但现在我得到了编译错误和类找不到:

  1. SpringTransactionPolicy
  2. 默认Kafka生产商工厂
  3. ChainedKafkaTransactionManager
  4. KafkaTransactionManager

我不确定需要在pom.xml中添加什么依赖项,以便在Camel Spingboot项目中配置我的KafkaTransactionManager

目前POM.XML是这样的

我对xml文件的某些部分进行了注释

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.5</version>
<!-- <version>2.3.3.RELEASE</version> -->
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.surajit.camel</groupId>
<artifactId>camel-microservice-a</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>CamelProject</name>
<description>Camel project for Spring Boot</description>
<properties>
<java.version>8</java.version>
<camel.version>3.7.0</camel.version>
<spring-boot.version>2.3.3.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-activemq-starter</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-kafka-starter</artifactId>
<version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel.springboot</groupId>
<artifactId>camel-jackson-starter</artifactId>
<version>${camel.version}</version>
</dependency>

<!--    <dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.2</version>
</dependency>

<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.2.1</version>
</dependency> -->
<!--    <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency> -->

<!--     <dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency> -->

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

我可以通过在pom.xml 中添加以下内容来解决问题

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!--  <version>2.2.14.RELEASE</version> -->
<version>2.7.0</version>
</dependency>

然后我删除了CommonBean自定义事务Bean类

Added the following in the application.properties
spring.kafka.producer.transaction-id-prefix="producer"
spring.kafka.producer.bootstrap-servers=xxx.xxx.xxx.xxx:9092
spring.kafka.jaas.enabled=false

现在按路线开始正确的

最新更新