java.lang.ClassCastException:类模型-在主题中将JSON对象作为消息发送时,springbo



我是Kafka的新手,我的模型类User面临以下问题[请求处理失败;嵌套异常为org.apache.kafka.commun.errors.SerializationException:无法转换类模型的值。用户到类org.apache.kfka.commun.serialization.StringSerializer在value.serializer]中指定,根本原因为java.lang.ClassCastException:类模型。不能将用户强制转换为org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28(~[kafka-clients-2.7.1.jar:na]中的类java.lang.String(model.User在加载器"app"的未命名模块中;java.lang.Sstring在加载器"bootstrap"的模块java.base中(

我怀疑这是由于在KafkaConfiguration中错误地导入了StringSerializer和JSONSerializer

1-KafkaConfiguration

package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.connect.json.JsonSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import com.fasterxml.jackson.databind.ser.std.StringSerializer;
import model.User;
@Configuration
public class KafkaConfiguration {

@Bean
public ProducerFactory<String,User> producerFactory()
{
Map<String,Object> config=new HashMap<>();

config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}

@Bean
public KafkaTemplate<String,User> kafkaTemplate()
{
return new KafkaTemplate<>(producerFactory());
}
}

2-用户资源类

package com.example.demo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import model.User;
@RestController
@RequestMapping("kafka")
public class UserResource {
@Autowired
KafkaTemplate<String,User> kafkatemplate;
public static final String TOPIC="Kafka_Example";

@GetMapping("/publish/{name}")
public String postMessage(@PathVariable("name") final String name)
{

kafkatemplate.send(TOPIC,new User(name,"Technology",12000L));

return "Published successfully";
}
}

3-用户类别

package model;
public class User {
private String name;
private String dept;
private long salary;


public User(String name, String dept, long salary) {
super();
this.name = name;
this.dept = dept;
this.salary = salary;
}

public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDept() {
return dept;
}
public void setDept(String dept) {
this.dept = dept;
}
public long getSalary() {
return salary;
}
public void setSalary(long salary) {
this.salary = salary;
}



}

有人能告诉我哪里出了问题吗?这是关于进口的吗(如果是,正确的是什么(?

感谢

解决方案

  1. 您需要使用此类导入字符串序列化程序和json序列化程序
org.apache.kafka.common.serialization.StringSerializer
org.springframework.kafka.support.serializer.JsonSerializer
  1. 模型类的包名称必须与控制器类和spring应用程序主类相同

我遵循这两件事,问题已经解决

您的代码是正确的,但您导入了错误的StringSerializer,请使用下面的导入

org.apache.kafka.common.serialization.StringSerializer
org.springframework.kafka.support.serializer.JsonSerializer

最新更新