春季云流+生产者错误后,对IN频道的订阅中断



我们使用的是弹簧云流绑定器kafka 3.1.1版本。希望处理断断续续的卡夫卡出版失败。

以下是我们现有的配置,

spring:
cloud:
function:
definition: execute;supplyPayment;
routing-expression:
stream:
kafka:
default:
producer:
errorChannelEnabled: true
binder:
brokers: localhost:9094
producer-properties:
retries: 20
max.in.flight.requests.per.connection: 10
linger.ms: 100
request.timeout.ms: 3000
batch.size: 20000
retry.backoff.ms: 1000
bindings:
execute-in-0:
destination: test-in-source
group: consumer-dedupe
concurrency: 2
execute-out-0:
destination: test-out-topic
producer:
error-channel-enabled: true

能够使用ServiceActivator获取生产者错误。

@ServiceActivator(inputChannel = "errorChannel")
public void handle(final ErrorMessage em) {
log.error("Error caught" + em.toString());
}

由于我们已经实现了ServiceActivator,我们预计不会引发异常,但我们看到抛出了TimeoutException的

org.apache.kafka.common.errors.TimeoutException: Topic test-out-topic not present in metadata after 60000 ms.
2021-12-09|06:11:25.267 ERROR o.s.c.s.f.FunctionConfiguration$FunctionToDestinationBinder - intuit_tid= provider_id= entity_type= Failure was detected during execution of the reactive function 'execute'
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@48ea8766]; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test-out-topic not present in metadata after 60000 ms., failedMessage=GenericMessage [payload=byte[1046], headers={intuit_originatingip=192.19.0.1, entity_version=v1, entity_processing_status=SUCCESS, PIPELINE_ID=1, target-protocol=kafka, ROUTING_INFO=RoutingInfo(currentPipelineNode=PipelineNode{id=0, type=null, sourceTopic=null, pipelineNodeStatus=null, errorHandlerClassName=null, errorTopicName=null}, executedPipelineNodeList=[PipelineNode{id=1, type=PROCESSOR, sourceTopic=idx-payment-source, pipelineNodeStatus=SUCCESS, errorHandlerClassName=com.intuit.fdp.enrichment.messagemesh.errorhandler.impl.ContinueProcessingErrorHandler, errorTopicName=null}, PipelineNode{id=2, type=PROCESSOR, sourceTopic=idx-payment-source, pipelineNodeStatus=SUCCESS, errorHandlerClassName=com.intuit.fdp.enrichment.messagemesh.errorhandler.impl.ContinueProcessingErrorHandler, errorTopicName=null}]), id=d9be7b62-711d-2200-2da7-775bc9ed4bbd, contentType=application/json, kafka_receivedTimestamp=1639059024747, intuit_realmid=123148525896109, intuit_iddomain=test, timestamp=1639059025243, test=testValue, intuit_tid=978bd234-48cc-4914-8afc-9c84be5c467e, intuit_offeringid=Intuit.platform.qbo.dtx.ui, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, intuit_locale=en_US, kafka_receivedTopic=idx-payment-source, kafka_offset=2, entity_type=PaymentTransactionEvent, entity_namespace=com.intuit.idx.event.payment.v1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@597cf885, intuit_country=US, provider_id=1234, intuit_corpid=Jacinda, kafka_receivedPartitionId=0, kafka_groupId=consumer-dedupe}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxPeekFuseable] :
reactor.core.publisher.Flux.doOnNext
org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Error has been observed at the following site(s):
|_ Flux.doOnNext ⇢ at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Stack trace:
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:518)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:542)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:1006)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.MonoMetricsFuseable$MetricsFuseableSubscriber.onNext(MonoMetricsFuseable.java:132)
at 
---
Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test-out-topic not present in metadata after 60000 ms.

errorHandler也在记录生产者的错误,

ErrorMessage [payload=org.springframework.integration.kafka.support.KafkaSendFailureException: nested exception is org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-out-topic-0:120000 ms has passed since batch creation, failedMessage=GenericMessage [payload=byte[1044], headers={intuit_originatingip=192.19.0.1, entity_version=v1, entity_processing_status=SUCCESS, PIPELINE_ID=1, target-protocol=kafka, ROUTING_INFO=RoutingInfo(currentPipelineNode=PipelineNode{id=0, type=null, sourceTopic=null, pipelineNodeStatus=null, errorHandlerClassName=null, errorTopicName=null}, executedPipelineNodeList=[PipelineNode{id=1, type=PROCESSOR, sourceTopic=idx-payment-source, pipelineNodeStatus=SUCCESS, errorHandlerClassName=com.intuit.fdp.enrichment.messagemesh.errorhandler.impl.ContinueProcessingErrorHandler, errorTopicName=null}, PipelineNode{id=2, type=PROCESSOR, sourceTopic=idx-payment-source, pipelineNodeStatus=SUCCESS, errorHandlerClassName=com.intuit.fdp.enrichment.messagemesh.errorhandler.impl.ContinueProcessingErrorHandler, errorTopicName=null}]), id=5ea18c46-1074-41bd-8ac6-adfc1aba7811, contentType=application/json, kafka_receivedTimestamp=1639058994738, intuit_realmid=123148525896109, intuit_iddomain=test, timestamp=1639058995246, test=testValue, intuit_tid=0a19feb2-e90c-4a3d-b9ef-94294bbd5ebd, intuit_offeringid=Intuit.platform.qbo.dtx.ui, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, intuit_locale=en_US, kafka_receivedTopic=idx-payment-source, kafka_offset=1, entity_type=PaymentTransactionEvent, entity_namespace=com.intuit.idx.event.payment.v1, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@597cf885, intuit_country=US, provider_id=1234, intuit_corpid=Angela, kafka_receivedPartitionId=0, kafka_groupId=consumer-dedupe}] [record=ProducerRecord(topic=test-out-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = intuit_originatingip, value = [49, 57, 50, 46, 49, 57, 46, 48, 46, 49]), RecordHeader(key = entity_version, value = [118, 49]), RecordHeader(key = entity_processing_status, value = [83, 85, 67, 67, 69, 83, 83]), RecordHeader(key = PIPELINE_ID, value = [49]), RecordHeader(key = target-protocol, value = [107, 97, 102, 107, 97]), RecordHeader(key = ROUTING_INFO, value = [123, 34, 99, 117, 114, 114, 101, 110, 116, 80, 105, 112, 101, 108, 105, 110, 101, 78, 111, 100, 101, 34, 58, 123, 34, 105, 100, 34, 58, 48, 44, 34, 103, 114, 111, 117, 112, 73, 100, 34, 58, 110, 117, 108, 108, 44, 34, 97, 114, 116, 105, 102, 97, 99, 116, 73, 100, 34, 58, 110, 117, 108, 108, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 110, 117, 108, 108, 44, 34, 116, 121, 112, 101, 34, 58, 110, 117, 108, 108, 44, 34, 114, 117, 110, 116, 105, 109, 101, 73, 100, 34, 58, 110, 117, 108, 108, 44, 34, 101, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 115, 34, 58, 91, 93, 44, 34, 115, 111, 117, 114, 99, 101, 84, 111, 112, 105, 99, 34, 58, 110, 117, 108, 108, 44, 34, 112, 105, 112, 101, 108, 105, 110, 101, 78, 111, 100, 101, 83, 116, 97, 116, 117, 115, 34, 58, 110, 117, 108, 108, 44, 34, 101, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 67, 108, 97, 115, 115, 78, 97, 109, 101, 34, 58, 110, 117, 108, 108, 44, 34, 101, 114, 114, 111, 114, 84, 111, 112, 105, 99, 78, 97, 109, 101, 34, 58, 110, 117, 108, 108, 125, 44, 34, 101, 120, 101, 99, 117, 116, 101, 100, 80, 105, 112, 101, 108, 105, 110, 101, 78, 111, 100, 101, 76, 105, 115, 116, 34, 58, 91, 123, 34, 105, 100, 34, 58, 49, 44, 34, 103, 114, 111, 117, 112, 73, 100, 34, 58, 34, 99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 109, 101, 115, 115, 97, 103, 101, 109, 101, 115, 104, 34, 44, 34, 97, 114, 116, 105, 102, 97, 99, 116, 73, 100, 34, 58, 34, 100, 101, 100, 117, 112, 101, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 34, 48, 46, 49, 46, 49, 57, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 80, 82, 79, 67, 69, 83, 83, 79, 82, 34, 44, 34, 114, 117, 110, 116, 105, 109, 101, 73, 100, 34, 58, 34, 105, 100, 120, 45, 100, 101, 100, 117, 112, 101, 34, 44, 34, 101, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 115, 34, 58, 91, 93, 44, 34, 115, 111, 117, 114, 99, 101, 84, 111, 112, 105, 99, 34, 58, 34, 105, 100, 120, 45, 112, 97, 121, 109, 101, 110, 116, 45, 115, 111, 117, 114, 99, 101, 34, 44, 34, 112, 105, 112, 101, 108, 105, 110, 101, 78, 111, 100, 101, 83, 116, 97, 116, 117, 115, 34, 58, 34, 83, 85, 67, 67, 69, 83, 83, 34, 44, 34, 101, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 67, 108, 97, 115, 115, 78, 97, 109, 101, 34, 58, 34, 99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 102, 100, 112, 46, 101, 110, 114, 105, 99, 104, 109, 101, 110, 116, 46, 109, 101, 115, 115, 97, 103, 101, 109, 101, 115, 104, 46, 101, 114, 114, 111, 114, 104, 97, 110, 100, 108, 101, 114, 46, 105, 109, 112, 108, 46, 67, 111, 110, 116, 105, 110, 117, 101, 80, 114, 111, 99, 101, 115, 115, 105, 110, 103, 69, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 34, 44, 34, 101, 114, 114, 111, 114, 84, 111, 112, 105, 99, 78, 97, 109, 101, 34, 58, 110, 117, 108, 108, 125, 44, 123, 34, 105, 100, 34, 58, 50, 44, 34, 103, 114, 111, 117, 112, 73, 100, 34, 58, 34, 99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 109, 101, 115, 115, 97, 103, 101, 109, 101, 115, 104, 34, 44, 34, 97, 114, 116, 105, 102, 97, 99, 116, 73, 100, 34, 58, 34, 112, 101, 114, 115, 105, 115, 116, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 34, 48, 46, 49, 46, 49, 57, 34, 44, 34, 116, 121, 112, 101, 34, 58, 34, 80, 82, 79, 67, 69, 83, 83, 79, 82, 34, 44, 34, 114, 117, 110, 116, 105, 109, 101, 73, 100, 34, 58, 34, 105, 100, 120, 45, 100, 101, 100, 117, 112, 101, 34, 44, 34, 101, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 115, 34, 58, 91, 123, 34, 101, 114, 114, 111, 114, 67, 111, 100, 101, 34, 58, 34, 52, 48, 48, 34, 44, 34, 101, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 67, 108, 97, 115, 115, 78, 97, 109, 101, 34, 58, 34, 99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 102, 100, 112, 46, 101, 110, 114, 105, 99, 104, 109, 101, 110, 116, 46, 109, 101, 115, 115, 97, 103, 101, 109, 101, 115, 104, 46, 101, 114, 114, 111, 114, 104, 97, 110, 100, 108, 101, 114, 46, 105, 109, 112, 108, 46, 80, 117, 98, 108, 105, 115, 104, 84, 111, 69, 114, 114, 111, 114, 84, 111, 112, 105, 99, 69, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 34, 44, 34, 101, 114, 114, 111, 114, 84, 111, 112, 105, 99, 78, 97, 109, 101, 34, 58, 34, 105, 100, 120, 45, 101, 114, 114, 111, 114, 45, 116, 111, 112, 105, 99, 34, 125, 44, 123, 34, 101, 114, 114, 111, 114, 67, 111, 100, 101, 34, 58, 34, 42, 34, 44, 34, 101, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 67, 108, 97, 115, 115, 78, 97, 109, 101, 34, 58, 34, 99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 102, 100, 112, 46, 101, 110, 114, 105, 99, 104, 109, 101, 110, 116, 46, 109, 101, 115, 115, 97, 103, 101, 109, 101, 115, 104, 46, 101, 114, 114, 111, 114, 104, 97, 110, 100, 108, 101, 114, 46, 105, 109, 112, 108, 46, 83, 116, 111, 112, 80, 114, 111, 99, 101, 115, 115, 105, 110, 103, 69, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 34, 44, 34, 101, 114, 114, 111, 114, 84, 111, 112, 105, 99, 78, 97, 109, 101, 34, 58, 110, 117, 108, 108, 125, 93, 44, 34, 115, 111, 117, 114, 99, 101, 84, 111, 112, 105, 99, 34, 58, 34, 105, 100, 120, 45, 112, 97, 121, 109, 101, 110, 116, 45, 115, 111, 117, 114, 99, 101, 34, 44, 34, 112, 105, 112, 101, 108, 105, 110, 101, 78, 111, 100, 101, 83, 116, 97, 116, 117, 115, 34, 58, 34, 83, 85, 67, 67, 69, 83, 83, 34, 44, 34, 101, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 67, 108, 97, 115, 115, 78, 97, 109, 101, 34, 58, 34, 99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 102, 100, 112, 46, 101, 110, 114, 105, 99, 104, 109, 101, 110, 116, 46, 109, 101, 115, 115, 97, 103, 101, 109, 101, 115, 104, 46, 101, 114, 114, 111, 114, 104, 97, 110, 100, 108, 101, 114, 46, 105, 109, 112, 108, 46, 67, 111, 110, 116, 105, 110, 117, 101, 80, 114, 111, 99, 101, 115, 115, 105, 110, 103, 69, 114, 114, 111, 114, 72, 97, 110, 100, 108, 101, 114, 34, 44, 34, 101, 114, 114, 111, 114, 84, 111, 112, 105, 99, 78, 97, 109, 101, 34, 58, 110, 117, 108, 108, 125, 93, 125]), RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = intuit_realmid, value = [49, 50, 51, 49, 52, 56, 53, 50, 53, 56, 57, 54, 49, 48, 57]), RecordHeader(key = intuit_iddomain, value = [116, 101, 115, 116]), RecordHeader(key = test, value = [116, 101, 115, 116, 86, 97, 108, 117, 101]), RecordHeader(key = intuit_tid, value = [48, 97, 49, 57, 102, 101, 98, 50, 45, 101, 57, 48, 99, 45, 52, 97, 51, 100, 45, 98, 57, 101, 102, 45, 57, 52, 50, 57, 52, 98, 98, 100, 53, 101, 98, 100]), RecordHeader(key = intuit_offeringid, value = [73, 110, 116, 117, 105, 116, 46, 112, 108, 97, 116, 102, 111, 114, 109, 46, 113, 98, 111, 46, 100, 116, 120, 46, 117, 105]), RecordHeader(key = intuit_locale, value = [101, 110, 95, 85, 83]), RecordHeader(key = entity_type, value = [80, 97, 121, 109, 101, 110, 116, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 69, 118, 101, 110, 116]), RecordHeader(key = entity_namespace, value = [99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 105, 100, 120, 46, 101, 118, 101, 110, 116, 46, 112, 97, 121, 109, 101, 110, 116, 46, 118, 49]), RecordHeader(key = intuit_country, value = [85, 83]), RecordHeader(key = provider_id, value = [49, 50, 51, 52]), RecordHeader(key = intuit_corpid, value = [65, 110, 103, 101, 108, 97]), RecordHeader(key = spring_json_header_types, value = [123, 34, 105, 110, 116, 117, 105, 116, 95, 111, 114, 105, 103, 105, 110, 97, 116, 105, 110, 103, 105, 112, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 101, 110, 116, 105, 116, 121, 95, 118, 101, 114, 115, 105, 111, 110, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 116, 101, 115, 116, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 116, 105, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 111, 102, 102, 101, 114, 105, 110, 103, 105, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 100, 101, 108, 105, 118, 101, 114, 121, 65, 116, 116, 101, 109, 112, 116, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 101, 110, 116, 105, 116, 121, 95, 112, 114, 111, 99, 101, 115, 115, 105, 110, 103, 95, 115, 116, 97, 116, 117, 115, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 80, 73, 80, 69, 76, 73, 78, 69, 95, 73, 68, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 108, 111, 99, 97, 108, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 116, 97, 114, 103, 101, 116, 45, 112, 114, 111, 116, 111, 99, 111, 108, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 101, 110, 116, 105, 116, 121, 95, 116, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 101, 110, 116, 105, 116, 121, 95, 110, 97, 109, 101, 115, 112, 97, 99, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 82, 79, 85, 84, 73, 78, 71, 95, 73, 78, 70, 79, 34, 58, 34, 99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 102, 100, 112, 46, 101, 110, 114, 105, 99, 104, 109, 101, 110, 116, 46, 112, 105, 112, 101, 108, 105, 110, 101, 46, 112, 114, 111, 99, 101, 115, 115, 111, 114, 115, 46, 82, 111, 117, 116, 105, 110, 103, 73, 110, 102, 111, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 99, 111, 117, 110, 116, 114, 121, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 112, 114, 111, 118, 105, 100, 101, 114, 95, 105, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 99, 111, 114, 112, 105, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 114, 101, 97, 108, 109, 105, 100, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 105, 100, 100, 111, 109, 97, 105, 110, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = true), key=null, value=[B@dfd024f, timestamp=null)], headers={id=1823c929-83f7-4b15-358f-821b3ae843b4, timestamp=1639059115250}]

看起来,由于引发了Timeout异常,通道内的订阅者丢失,并且看到下面由errorChannel、记录的

ErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.execute-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[761], headers={intuit_originatingip=192.19.0.1, entity_version=[B@24abc8d8, test=[B@677453b9, intuit_tid=[B@20a8288, intuit_offeringid=[B@3feabc33, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, intuit_locale=en_US, kafka_receivedTopic=idx-payment-source, kafka_offset=5, entity_type=[B@45fd5e1e, entity_namespace=[B@513483, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@597cf885, intuit_country=US, provider_id=[B@5619f8c3, intuit_corpid=[B@3c78cd17, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1639059114780, kafka_groupId=consumer-dedupe, intuit_realmid=[B@135a7928, intuit_iddomain=test}], failedMessage=GenericMessage [payload=byte[761], headers={intuit_originatingip=192.19.0.1, entity_version=[B@24abc8d8, test=[B@677453b9, intuit_tid=[B@20a8288, intuit_offeringid=[B@3feabc33, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, intuit_locale=en_US, kafka_receivedTopic=idx-payment-source, kafka_offset=5, entity_type=[B@45fd5e1e, entity_namespace=[B@513483, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@597cf885, intuit_country=US, provider_id=[B@5619f8c3, intuit_corpid=[B@3c78cd17, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1639059114780, kafka_groupId=consumer-dedupe, intuit_realmid=[B@135a7928, intuit_iddomain=test}], headers={kafka_data=ConsumerRecord(topic = idx-payment-source, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1639059114780, serialized key size = -1, serialized value size = 1186, headers = RecordHeaders(headers = [RecordHeader(key = intuit_originatingip, value = [49, 57, 50, 46, 49, 57, 46, 48, 46, 49]), RecordHeader(key = entity_version, value = [118, 49]), RecordHeader(key = test, value = [116, 101, 115, 116, 86, 97, 108, 117, 101]), RecordHeader(key = intuit_tid, value = [56, 49, 55, 53, 52, 55, 51, 54, 45, 101, 102, 97, 55, 45, 52, 102, 53, 48, 45, 98, 101, 101, 57, 45, 102, 50, 54, 48, 48, 56, 101, 49, 53, 48, 101, 48]), RecordHeader(key = intuit_offeringid, value = [73, 110, 116, 117, 105, 116, 46, 112, 108, 97, 116, 102, 111, 114, 109, 46, 113, 98, 111, 46, 100, 116, 120, 46, 117, 105]), RecordHeader(key = intuit_locale, value = [101, 110, 95, 85, 83]), RecordHeader(key = entity_type, value = [80, 97, 121, 109, 101, 110, 116, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 69, 118, 101, 110, 116]), RecordHeader(key = entity_namespace, value = [99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 105, 100, 120, 46, 101, 118, 101, 110, 116, 46, 112, 97, 121, 109, 101, 110, 116, 46, 118, 49]), RecordHeader(key = intuit_country, value = [85, 83]), RecordHeader(key = provider_id, value = [84, 101, 115, 116, 80, 114, 111, 118, 49, 50, 51, 52]), RecordHeader(key = intuit_corpid, value = [74, 97, 99, 105, 110, 100, 97]), RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = intuit_realmid, value = [49, 50, 51, 49, 52, 56, 53, 50, 53, 56, 57, 54, 49, 48, 57]), RecordHeader(key = intuit_iddomain, value = [116, 101, 115, 116]), RecordHeader(key = spring_json_header_types, value = [123, 34, 105, 110, 116, 117, 105, 116, 95, 111, 114, 105, 103, 105, 110, 97, 116, 105, 110, 103, 105, 112, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 99, 111, 117, 110, 116, 114, 121, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 108, 111, 99, 97, 108, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 105, 100, 100, 111, 109, 97, 105, 110, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@3609e9b), id=99a23404-032b-363c-ae7a-2e5f6cd83ef8, sourceData=ConsumerRecord(topic = idx-payment-source, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1639059114780, serialized key size = -1, serialized value size = 1186, headers = RecordHeaders(headers = [RecordHeader(key = intuit_originatingip, value = [49, 57, 50, 46, 49, 57, 46, 48, 46, 49]), RecordHeader(key = entity_version, value = [118, 49]), RecordHeader(key = test, value = [116, 101, 115, 116, 86, 97, 108, 117, 101]), RecordHeader(key = intuit_tid, value = [56, 49, 55, 53, 52, 55, 51, 54, 45, 101, 102, 97, 55, 45, 52, 102, 53, 48, 45, 98, 101, 101, 57, 45, 102, 50, 54, 48, 48, 56, 101, 49, 53, 48, 101, 48]), RecordHeader(key = intuit_offeringid, value = [73, 110, 116, 117, 105, 116, 46, 112, 108, 97, 116, 102, 111, 114, 109, 46, 113, 98, 111, 46, 100, 116, 120, 46, 117, 105]), RecordHeader(key = intuit_locale, value = [101, 110, 95, 85, 83]), RecordHeader(key = entity_type, value = [80, 97, 121, 109, 101, 110, 116, 84, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 69, 118, 101, 110, 116]), RecordHeader(key = entity_namespace, value = [99, 111, 109, 46, 105, 110, 116, 117, 105, 116, 46, 105, 100, 120, 46, 101, 118, 101, 110, 116, 46, 112, 97, 121, 109, 101, 110, 116, 46, 118, 49]), RecordHeader(key = intuit_country, value = [85, 83]), RecordHeader(key = provider_id, value = [84, 101, 115, 116, 80, 114, 111, 118, 49, 50, 51, 52]), RecordHeader(key = intuit_corpid, value = [74, 97, 99, 105, 110, 100, 97]), RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = intuit_realmid, value = [49, 50, 51, 49, 52, 56, 53, 50, 53, 56, 57, 54, 49, 48, 57]), RecordHeader(key = intuit_iddomain, value = [116, 101, 115, 116]), RecordHeader(key = spring_json_header_types, value = [123, 34, 105, 110, 116, 117, 105, 116, 95, 111, 114, 105, 103, 105, 110, 97, 116, 105, 110, 103, 105, 112, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 99, 111, 117, 110, 116, 114, 121, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 108, 111, 99, 97, 108, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 44, 34, 105, 110, 116, 117, 105, 116, 95, 105, 100, 100, 111, 109, 97, 105, 110, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@3609e9b), timestamp=1639059117896}] for original GenericMessage [payload=byte[761], headers={intuit_originatingip=192.19.0.1, entity_version=[B@24abc8d8, test=[B@677453b9, intuit_tid=[B@20a8288, intuit_offeringid=[B@3feabc33, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, intuit_locale=en_US, kafka_receivedTopic=idx-payment-source, kafka_offset=5, entity_type=[B@45fd5e1e, entity_namespace=[B@513483, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@597cf885, intuit_country=US, provider_id=[B@5619f8c3, intuit_corpid=[B@3c78cd17, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1639059114780, kafka_groupId=consumer-dedupe, intuit_realmid=[B@135a7928, intuit_iddomain=test}]

一旦发生这种情况,即使kafka问题得到解决,应用程序也无法恢复,除非重新启动。

以下是示例项目,https://github.com/ssruthisree/reactive-processor-kafka聚合是我正在发布的函数。要复制已完成的错误步骤:1(启动应用程序2(验证聚合使用并发布到转换主题的消息。3( 删除转换主题4(Producer引发的异常未被Flux的onError捕获。

我们如何避免生产者错误导致订阅失败?在没有重新启动应用程序的情况下解决kafka问题后继续消费?

这意味着您在尝试使用为命令式函数设计的错误处理机制时,有一个

反应式您的错误Dispatcher has no subscribers . . .只是意味着您的反应订阅失败(由于某些处理错误(并取消订阅频道(流不再起作用(。

最新更新