(在开始问题之前,我的英语可能不足以描述清楚。如果你不明白请告诉我
我正在尝试通过Kafka将数据对象从A spring项目(生产者)发送到B spring项目(消费者)。
问题是A和B中的数据对象具有不同的类路径。因此B项目的数据类不能映射A项目的字段。
但是两个对象有相同的字段。所以我想从A项目中获得一个对象作为B项目的参数。
错误消息
Listener failed; nested exception is
org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is
org.springframework.messaging.converter.MessageConversionException: failed to resolve class name. Class not found [com.example.springboot.DTO.kafka.PostViewCountDTO]; nested exception is
java.lang.ClassNotFoundException: com.example.springboot.DTO.kafka.PostViewCountDTO
build.gradle
implementation 'org.apache.kafka:kafka-clients:2.8.0'
implementation 'org.apache.kafka:kafka_2.13:2.8.0'
implementation 'org.springframework.boot:spring-boot-starter-web:2.5.3'
数据类(使用A和B项目)
public class PostViewCountDTO implements Serializable {
private static final long serialVersionUID = 1L;
@NotNull
private long postNo;
}
生产配置
@Configuration
public class PostViewProducerConfig {
@Value("${spring.kafka.producer.bootstrap-servers}")
private String bootstrapServer;
@Bean
public Map<String,Object> postViewProducerConfigs() {
return JsonSerializer.getStringObjectMap(bootstrapServer);
}
@Bean
public ProducerFactory<String, PostViewCountDTO> postViewCountDTOProducerFactory() {
return new DefaultKafkaProducerFactory<>(postViewProducerConfigs());
}
@Bean
public KafkaTemplate<String, PostViewCountDTO> postViewDTOKafkaTemplate() {
return new KafkaTemplate<>(postViewCountDTOProducerFactory());
}
}
通用JsonSerializer类
public class JsonSerializer {
static Map<String, Object> getStringObjectMap(String bootstrapServer) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);
return props;
}
}
消费者配置
@Configuration
@RequiredArgsConstructor
public class PostViewConsumerConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServer;
@Bean
public Map<String,Object> postViewConsumerConfigs() {
return JsonDeserializer.getStringObjectMap(bootstrapServer);
}
@Bean
public ConsumerFactory<String, PostViewCountDTO> postViewCountDTO_ConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(postViewConsumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, PostViewCountDTO> postViewCountListener() {
ConcurrentKafkaListenerContainerFactory<String, PostViewCountDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postViewCountDTO_ConsumerFactory());
return factory;
}
@Bean
public StringJsonMessageConverter jsonConverter() {
return new StringJsonMessageConverter();
}
}
@Async
public void sendPostNo(PostViewCountDTO postViewCountDTO) {
postViewKafkaTemplate.send(topic_viewCount, null, postViewCountDTO);
}
@KafkaListener(topics = topic_viewCount, groupId = groupId, containerFactory = "postViewCountListener")
public void consume(@Payload PostViewCountDTO postViewCountDTO) {
...
}
您需要向序列化器和反序列化器添加类型映射
https://docs.spring.io/spring-kafka/docs/current/reference/html/serdes-mapping-types
在生产者侧,将com.a.PostViewCountDTO
映射到PostViewCountDTO
。在消费者端,将com.b.PostViewCountDTO
映射到PostViewCountDTO
。