在kafka上发送JSON对象作为消息时出错



我是一名试图学习apache kafka的学生。我试图发送一个JSON对象作为一个消息使用kafka在spring启动使用java。但是当我试图发送它时,它抛出一个错误,说我的模型类不能被转换为字符串,即使我在应用程序中提到了Json序列化器。属性文件。例外是:

java.lang.ClassCastException: class com.example.demo.model.BookES cannot be cast to class java.lang.String (com.example.demo.model.BookES is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

我的应用程序。属性文件

server.port=8081
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

我的控制器类,我试图发送我的消息

@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody BookES bookES){
logger.info("in publish method");
kafkaProducer.sendMessage(bookES);
return ResponseEntity.ok("Json message sent to kafka topic");
}

有sendMessage方法的Kafka生产者类:

package com.example.demo.kafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import com.example.demo.controller.BookController;
import com.example.demo.model.Book;
import com.example.demo.model.BookES;
@Service
public class KafkaProducer {

@Autowired
private NewTopic topic;

Logger logger = LoggerFactory.getLogger(KafkaProducer.class);

private String topicName = "bookmanagement";

@Autowired
private KafkaTemplate<String, BookES> kafkaTemplate;


public void sendMessage(BookES bookES) {

logger.info("in sendMessage method");
logger.info(String.format("Message sent -> %s",bookES.toString()));

Message<BookES> message = MessageBuilder.withPayload(bookES).setHeader(KafkaHeaders.TOPIC, topic.name()).build();
kafkaTemplate.send(message);



}


}

My model class:


package com.example.demo.model;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Document(indexName="my-application")
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public class BookES{
@Override
public String toString() {
return "BookES [bookId=" + bookId + ", bookName=" + bookName + ", description=" + description + "]";
}
@Id
private String bookId;
private String bookName;
private String description;
public String getBookId() {
// TODO Auto-generated method stub
return this.bookId;
}
public String getBookName() {
return bookName;
}
public void setBookName(String bookName) {
this.bookName = bookName;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public void setBookId(String bookId) {
this.bookId = bookId;
}

}

我的项目是在github链接:github链接

我尝试用不同的注释注释我的模型类,如@JsonSerializer等。但是没有成功。我在邮差上得到的回应是:


{
"timestamp": "2022-11-22T11:24:30.738+00:00",
"status": 500,
"error": "Internal Server Error",
"message": "Can't convert value of class com.example.demo.model.BookES to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer",
"path": "/books/publish"
}

您需要提供您自己的序列化器,它可以将您的BookES序列化为字节数组。这是一个我用作通用Json序列化器的类。它应该能很好地处理你的类:

package ***;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.mgnt.utils.JsonUtils;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class GenericJsonSerializer implements Serializer<Object> {
@Override
public byte[] serialize(String s, Object obj) {
byte[] result;
try {
result = JsonUtils.writeObjectToJsonString(obj).getBytes(StandardCharsets.UTF_8);
} catch (JsonProcessingException e) {
throw new SerializationException("Error occurred while serializing " + obj.getClass().getCanonicalName() + " to byte[]", e);
}
return result;
}
}
在您的属性中,您需要将该类注册为反序列化器。请注意,当你读回你的消息时,如果你希望它被读为BookES类,你需要提供反序列化类,它将接受字节数组并将其转换回BookES类。(必须实现org.apache.kafka.common.serialization.Deserializer接口)。在我的例子中,我使用了JsonUtils类,它来自开源Java库MgntUtils(由我编写和维护)。您可以轻松地替换它,只需使用Json-Jackson库中的ObjectMapper类或使用GSON库。但它可能使它更容易使用JsonUtils。如果您希望使用它,这里是JsonUtils类的Javadoc。MgntUtils库可以通过Maven Artifact或Github 获得。

最新更新