向kafka-dlq发送错误消息时获取ClassCastException



我正在将kafka与春季云流一起使用。我的侦听器代码看起来像这个

@Bean
public Consumer<List<Map<String, Object>>> receive() throws MyException {
return message -> {
try {
messageService.processMessage(message);
} catch (MyException e) {
throw new MyException(e.getMessage());
}
};
}

我最近将其更改为以批处理模式使用消息。如果失败,我需要消息转到dlq主题。现在它转到错误通道,它只是

@StreamListener("errorChannel")
public void handleError(ErrorMessage errorMessage) {
log.error("exception occurred. errorMessage = {}", errorMessage);
}

但我得到了一个带有错误跟踪的ClassCastException

2020-04-02 15:34:21.841 ERROR 22222 --- [container-0-C-1] o.s.k.listener.BatchLoggingErrorHandler  : Error while processing:
ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.group-1.errors'; nested exception is java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app'), failedMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Exception thrown while invoking MessageListener#receive[1 args]; nested exception is com.corelogic.idap.idappipelinegenericelasticsink.exception.ElasticClientException: forced, failedMessage=GenericMessage [payload=[[B@1701c9b1], headers={kafka_offset=[36], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@ce0748, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[test], kafka_receivedTimestamp=[1585856056860], contentType=application/json, kafka_groupId=group-1}], headers={kafka_data=[ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)], id=0cb01a1e-fd22-f409-bcbc-1f86880efeec, sourceData=[ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 36, CreateTime = 1585856056860, serialized key size = -1, serialized value size = 22, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@1701c9b1)], timestamp=1585856060836}] for original GenericMessage [payload=[[B@1701c9b1], headers={kafka_offset=[36], kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@ce0748, kafka_timestampType=[CREATE_TIME], kafka_receivedMessageKey=[null], kafka_receivedPartitionId=[0], kafka_batchConvertedHeaders=[{}], kafka_receivedTopic=[test], kafka_receivedTimestamp=[1585856056860], contentType=application/json, kafka_groupId=group-1}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1777) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:1501) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1383) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:1271) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1254) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1007) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:927) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'test.group-1.errors'; nested exception is java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app')
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:483) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:217) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:201) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:75) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:527) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:497) ~[spring-integration-kafka-3.2.1.RELEASE.jar:3.2.1.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:1475) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:1438) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:1370) ~[spring-kafka-2.3.6.RELEASE.jar:2.3.6.RELEASE]
... 8 common frames omitted
Caused by: java.lang.ClassCastException: class java.util.LinkedList cannot be cast to class org.apache.kafka.clients.consumer.ConsumerRecord (java.util.LinkedList is in module java.base of loader 'bootstrap'; org.apache.kafka.clients.consumer.ConsumerRecord is in unnamed module of loader 'app')
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$8(KafkaMessageChannelBinder.java:1057) ~[spring-cloud-stream-binder-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]

我的配置看起来像这个

spring.cloud.stream.bindings:
input:
destination: test
group: ${application.group}
consumer:
max-attempts: 5
batch-mode: true
output:
contentType: application/json
spring.cloud.stream.kafka.bindings:
input:
consumer:
enableDlq: true
dlqName: dead-topic
autoCommitOnError: true
autoCommitOffset: true
republish-to-dlq: true
dlqProducerProperties:
configuration:
key.serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
value.serializer: org.apache.kafka.common.serialization.ByteArrayDeserializer

这里有一个类似的问题,Kafka Consumer-ClassCastException-java,但它并没有起到多大作用。

我需要像这个问题中那样写我自己的ListSerde吗?Kafka Streams API中的ArrayList Serde问题还是等待标准ListSerdePR进入?

DLQ当前不支持批处理侦听器;请在GitHub上打开一个针对spring cloud stream binder kafka的问题。

不过,请记住,框架不知道批处理中的哪条记录失败了,所以批处理中所有的记录都会转到那里,即使有些记录已经成功处理。

对于批处理模式,通常最好在侦听器中处理错误。

最新更新