我正在为spring-kafka项目实现异常处理,我有自己的DeadLetterPublishingRecover,它处理kafka监听器中发生的异常,整个流程是完美的,即当我在逻辑中抛出任何异常时,我可以根据框架实现记录并将其发送到DLQ主题。消费者记录出现了问题,如果我更改了消费者记录值中的任何内容,同一消费者记录将发布到DLQ主题,但更改在我的senairo中实际上是错误的,我想记录原始消息。
@KafkaListener(topics = "${message.topic.name}",containerFactory = "kafkaListenerContainerFactory")
public void listenGroupFoo(ConsumerRecord<String, MyEvent> consumerRecord) throws InterruptedException, ExecutionException {
System.out.println("Received Message" + consumerRecord.value());
MyEvent consumedEvent=consumerRecord.value();
consumedEvent.setQuantity(1000);
throw new InvalidProductCodeException();
}
我发送给主题的实际消息只包含10个数量,我正在更改一些内容,比如将数量更改为1000并抛出一些异常(就像处理过程中发生的任何异常一样(…
Received MessageOrderObject CreatedDate=Mon Sep 14 19:38:15 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002
在我抛出错误后,我的记录将是
Received MessageOrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=10, customerId=0002]
Error Record OrderObject [createdDate=Tue Sep 15 13:20:16 IST 2020, productCode=TES, price=10, quantity=1000, customerId=0002]
这是我的DLQ子类
@Component
public class DeadLetterSubclass extends DeadLetterPublishingRecoverer {
@Value(value="${error.topic.name}")
static
String errorTopicName;
BusinessUtils utils=new BusinessUtils();
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
MY_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition("stock-order-error-topic", cr.partition());
public DeadLetterSubclass(KafkaOperations<? extends Object, ? extends Object> template) {
super(template,MY_DESTINATION_RESOLVER);
this.setHeadersFunction((consumerRecord,exception)->{
System.out.println("Error Record "+consumerRecord.value());
return consumerRecord.headers();
});
}
我想记录异常发生时要发布的原始事件对象(ConsumerRecord(。在我的情况下,我的实际订单数量是10,但1000数量的订单正在记录,这不是实际订单。
由于您有对原始ConsumerRecord
的硬引用,如果您需要在错误处理程序中保持不变,则不应对其进行更改。
框架无法预测您可能会使侦听器中的值发生变化。制作一份";以防万一";。
您不应该对原始值进行突变——而是对其进行克隆,然后对克隆进行突变。