侦听器方法无法在传入消息和Backoff none为ConsumerRecorded耗尽的情况下调用



下面是我为kafka侦听器定义的方法,如果有效负载接收到null或空字符串,我想我会得到以下错误。。。你能帮忙吗。

@KafkaListener(topics = "${kafka.consumer-topic-name.reservation}", groupId = "${kafka.consumer-group-id.test}",
containerFactory = "kafkaListenerContainerFactory",autoStartup = "${kafka.auto-start.consumer.tets}")
public void consumeReservation(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String kafkaKey) {}
[org.springframework.kafka.KafkaListenerEndpointContainer #0-0-C-1] ERROR o.s.k.l.SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(topic = test_topic, partition = 0, leaderEpoch = 2, offset = 453473, CreateTime = 1601962346576, serialized key size = 41, serialized value size = -1, headers = RecordHeaders(headers = [RecordHeader(key = OPERATION, value = [68, 69, 76, 69, 84, 69]), RecordHeader(key = __Key_TypeId__, value = [99, 108, 75, 101, 121])], isReadOnly = false), key = {
"orgId": "1",
"orderId": "U4000024004"}, value = null)

org.springframework.kafka.listener.ListenerExecutionFailedException:无法使用传入消息调用侦听器方法终结点处理程序详细信息:方法[public void com.demo.test.analytics.testanalytics.consumer.FLCreservationKafkaConsumer.ConsumerReservation(java.lang.String、java.lang.Sstring、java.lang.String)]Bean[com.demo.test.analytics.testanalysics.consumer.FLReseervationKafkaConsumer@731702d1];嵌套异常为org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法解析public void com.demo.test.analytics.consumer.FLCreservationKafkaConsumer.ConsumerReservation中索引0处的方法参数(java.lang.String、java.lang.Sstring、java.lang.String):1个错误:[对象'payload'中的错误:codes[];论点[];默认消息[有效负载值不得为空]],failedMessage=GenericMessage[有效载荷=org.springframework.kafka.support.KafkaNull@2d99561c,headers={Key_TypeId=[B@19a2dc5f,kafka_offset=453473,OPERATION=[B@7d75c01a,kafca_customer=org.apache.kafka.clients.consumer.KafkaConsumer@363f44ef,kafka _timestampType=CREATE_TIME,kafk_areceivedPartitionId=0,kaf_areceivedMessageKey={"orgId":"1〃;,"orderId":"U4000024004";},kafka_ereceivedTopic=test_1order,kafca_ereceivedtimestamp=1601962346576,kafka _groupId=reservation_group_id}];嵌套异常为org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法解析public void com.demo.test.analytics.consumer.FLCreservationKafkaConsumer.ConsumerReservation中索引0处的方法参数(java.lang.String、java.lang.Sstring、java.lang.String):1个错误:[对象'payload'中的错误:codes[];论点[];默认消息[有效负载值不得为空]],failedMessage=GenericMessage[有效载荷=org.springframework.kafka.support.KafkaNull@2d99561c,headers={Key_TypeId=[B@19a2dc5f,kafka_offset=453473,OPERATION=[B@7d75c01a,kafca_customer=org.apache.kafka.clients.consumer.KafkaConsumer@363f44ef,kafka _timestampType=CREATE_TIME,kafk_areceivedPartitionId=0,kaf_areceivedMessageKey={"orgId":"1〃;,"orderId":"U4000024004";},kafka_ereceivedTopic=test_1order,kafca_ereceivedtimestamp=1601962346576,kafka _groupId=reservation_group_id}]位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessage ListenerContaner.java:1925)位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessage ListenerContaner.java:11913)位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessage ListenerContaner.java:1812)位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessage ListerContainer.java:1739)位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessage ListenerContaner.java:1636)位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessage ListenerContaner.java:1366)位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessage ListenerContaner.java:1082)网址:org.springframework.kafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessage ListenerContaner.java:990)位于java.util.concurrent.Executors$RunnableAdapter.call(Executiators.java:511)位于java.util.concurrent.FFutureTask.run(FutureTask.java:266)在java.lang.Thread.run(线程.java:748)由以下原因引起:org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException:无法解析public void com.demo.test.analytics.consumer.FLCreservationKafkaConsumer.ConsumerReservation(java.lang.String、java.lang.Sstring、java.lang.String)中索引0处的方法参数:1个错误:[对象'payload'中的错误:codes[];论点[];默认消息[有效负载值不得为空]]位于org.springframework.messageing.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgementResolver.java:122)位于org.springframework.cafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListerAnnotatonBeanPostProcessor.java:901)位于org.springframework.messageing.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgument ResolverCompposite.java:117)位于org.springframework.messageing.handler.invocation.InvocaleHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)位于org.springframework.messageing.handler.invocation.IInvocableHandlerMethod.ininvoke(InvocableHandler方法.java:116)网址:org.springframework.kafka.elistener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)位于org.springframework.kafka.elistener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListerAdapter.java:329)位于org.springframework.kafka.elistener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessage ListenerAdapter.java:86)位于org.springframework.kafka.elistener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessage ListenerAdapter.java:51)位于org.springframework.kafka.elistener.CafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1880)位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessage ListenerContAINer.java:1862)位于org.springframework.cafka.elistener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessage ListenerContaner.java:1799)…8个常见帧省略

您需要指定不需要有效负载。

@Payload(required = false) String payload, ...

我遇到了同样的问题,下面是我搜索的一些解决方案。

  1. 在yaml 中将spring.listener.ack-mode设置为manual/manua immediate

  2. 如果上面的解决方案无效,请尝试在中添加containerFactoryannotation@KafkaListener样的"KafkaListener(主题={"xxxx"},组Id="xxxx";xxxx";,containerFactory=";batchFactory")"和创建一个相同名称的bean。

    public KafkaListenerContainerFactory<?> batchFactory(){
    ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(batchConsumerConfigs()));
    factory.setBatchListener(true); 
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    return factory;
    }
    

相关内容

  • 没有找到相关文章

最新更新