kafkatemplate正在修改发送的消息,需要按原样发送原始消息



我已经实现了自定义kafkalitererrorhandler。如果消息处理失败,我想将消息发送到重试主题。为此,我添加了一些标题。为此,我使用了springmeesage。

问题是当我使用kafkatemplate发送消息时,它会在字符串消息中添加"\"。

下面是我正在做的代码。

public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
logger.info("Enter handleError message");
int numberOfRetries = messageRetryCount(message);
MessageBuilder<?> messageBuilder = MessageBuilder.fromMessage(message).removeHeader(KafkaHeaders.TOPIC)
.removeHeader(KafkaHeaders.PARTITION_ID).removeHeader(KafkaHeaders.MESSAGE_KEY)
.setHeader(KafkaHeaders.TOPIC, numberOfRetries > 0 ? retryTopic : dlqTopic);
template.send(messageBuilder.build());

内部弹簧kafka将消息转换为生产者记录。其在输出中将\添加到字符串中。

2020-03-20 12:25:28.804  INFO 10936 --- [_consumer-0-C-1] c.h.kafkaretry.consumer.SimpleConsumer   : in rety :: ""testfail""

有人面临同样的问题吗?有什么替代方案或解决方案吗?

看起来您使用的是JsonSerializer,而您的数据只是一个普通字符串。考虑在消费者端使用StringSerializerJsonDeserializer

最新更新