Spring Cloud Stream+Kafka Dynamic Destination+错误处理



我们在Spring cloud Stream上有一个与Project Reactor集成的应用程序。我们通过在Message头中设置spring.cloud.stream.sendto.destination来动态设置目标主题,并发布消息。我们正在寻求处理错误场景,如kafka服务器间歇性关闭或发布时主题不可用。我们已经实现了@ServiceActivator来处理所有错误。当动态设置主题时,ServiceActivator不会捕捉到生产者错误,只有消费者错误会影响ServiceActivator。如果我们提前设置out.destination主题,那么生产者错误也会被捕获。有没有办法为动态目的地启用errorChannel?下面的示例代码,application.yml

spring:
cloud:
function:
definition: generate_flux;process
stream:
kafka:
binder:
brokers: localhost:9094
required-acks: all
producer-properties:
retries: 2
bindings:
generate_flux-out-0:
destination: source
process-in-0:
destination: source
group: processors
process-out-0:
producer:
error-channel-enabled: true

在应用程序类中,具有动态目的地主题集的函数,

@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> process() {
return (msg) -> {
return msg.flatMap(
message -> {
log.info("individual msg " + message);
Map<String, Object> headers = new HashMap<>();
headers.put("spring.cloud.stream.sendto.destination", "processed");
MessageHeaders mh = new MessageHeaders(headers);
return Mono.just(MessageBuilder.createMessage("vvvv processed ", mh));
});
};
}

服务激活器:

@ServiceActivator(inputChannel = "errorChannel")
public void handle(final ErrorMessage em) {
log.error("Error caught" + em.toString());
}

启动spring-boot应用程序,确保消息被消费和发布,然后删除主题";处理过的";以创建失败场景。

通过以上,生产者出现超时异常,但Service Activator错误未捕捉到通道:

Caused by: org.apache.kafka.common.errors.TimeoutException: Topic processed not present in metadata after 60000 ms.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@f258f76]; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic processed not present in metadata after 60000 ms., failedMessage=GenericMessage [payload=byte[15], headers={id=13dc2799-45c5-0c10-2e62-4e296f62d1dc, spring.cloud.stream.sendto.destination=processed, contentType=application/json, timestamp=1620856125106}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxPeekFuseable] :
reactor.core.publisher.Flux.doOnNext
org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Error has been observed at the following site(s):
|_ Flux.doOnNext ⇢ at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$bindFunctionToDestinations$9(FunctionConfiguration.java:501)
Stack trace:
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:512)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:487)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:420)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:119)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:141)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic processed not present in metadata after 60000 ms.
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:574)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:389)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:517)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1041)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.lambda$null$7(FunctionConfiguration.java:512)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:487)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:420)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:387)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:119)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:97)
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$null$8(IntegrationReactiveUtils.java:141)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:396)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:78)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:455)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:429)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2069)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1988)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1928)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1814)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1531)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1178)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:829)

但服务激活器只显示与消费者相关的错误,

ERROR c.i.f.e.d.DedupeProcessorApplication - intuit_tid= Error caughtErrorMessage [payload=org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.process-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[43], headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7da2cc8f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=source, kafka_receivedTimestamp=1620856185102, kafka_groupId=processors}], failedMessage=GenericMessage [payload=byte[43], headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7da2cc8f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=source, kafka_receivedTimestamp=1620856185102, kafka_groupId=processors}], headers={kafka_data=ConsumerRecord(topic = source, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1620856185102, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@4cb51739), id=a2fef876-a0ab-c825-9ee6-50b355044e51, sourceData=ConsumerRecord(topic = source, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1620856185102, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = false), key = null, value = [B@4cb51739), timestamp=1620856206192}] for original GenericMessage [payload=byte[43], headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@7da2cc8f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=source, kafka_receivedTimestamp=1620856185102, kafka_groupId=processors}]

只需将主题更改为非动态,即从Funtion 中删除以下行

// headers.put("spring.cloud.stream.sendto.destination", "processed");

并将目的地主题添加到out配置

process-out-0:
destination: processed
producer:
error-channel-enabled: true

能够在ServiceActivator处获取生产者错误。在下方登录

Error caughtErrorMessage [payload=org.springframework.integration.kafka.support.KafkaSendFailureException: nested exception is org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for processed-0:120000 ms has passed since batch creation, failedMessage=GenericMessage [payload=byte[15], headers={contentType=application/json, id=0ba150e5-8e6f-81f1-b9c4-cda115171ce0, timestamp=1620856522352}] [record=ProducerRecord(topic=processed, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = contentType, value = [97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 106, 115, 111, 110]), RecordHeader(key = spring_json_header_types, value = [123, 34, 99, 111, 110, 116, 101, 110, 116, 84, 121, 112, 101, 34, 58, 34, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 83, 116, 114, 105, 110, 103, 34, 125])], isReadOnly = true), key=null, value=[B@339a63df, timestamp=null)], headers={id=1acaece7-c596-c9ff-440b-37df824bd564, timestamp=1620856642357}]

我的问题是,当目标主题在消息的标头中动态设置而未配置时,我如何处理错误?

我可能应该写一个我可以参考的博客,但简单的答案是:我们在s-c-stream端无法处理由于反应函数而产生的错误,原因很简单——我们完全无法控制流中发生了什么。

反应函数和命令函数之间的根本区别在于,命令函数充当消息监听器,并由我们(框架(在每条消息上调用。相反,在应用程序初始化期间,反应函数只调用一次,在初始化期间,我们将源流连接到用户函数公开的发布服务器,然后将完全控制权交给用户。

所以它;这是一个工作单位的问题,在命令函数中,这种单位是消息,而在反应函数中,这样的单位是整个消息流,它可能是无限的。

所有这一切都意味着,在编写反应式函数时,您必须依赖于反应式API本身丰富的错误处理能力。

相关内容

  • 没有找到相关文章

最新更新