Spring Cloud Stream Reactive-如何在反应流管道的情况下进行错误处理



如何对反应流管道执行错误处理。像

  • 应用程序错误处理(例如:errorChannel(
  • 系统错误处理(使用DLQ、再处理等(

当前文档仅描述非反应管道的错误处理。https://docs.spring.io/spring-cloud-stream/docs/Fishtown.BUILD-SNAPSHOT/reference/htmlsingle/#_application_error_handling

Spring云流通过为错误处理场景提供简单的配置,为用户做得非常简单。如果相同的错误处理用例(具有相同的配置(也适用于反应流管道,那就太好了。用例和相应的配置详细信息如下:

  • @StreamListner("errorChannel"(注释用于全局错误处理
  • @KafkaListener(id="bar",topics="反应流错误主题"(
  • DLQ的配置,并向错误主题生成失败消息spring.cloud.stream.kafka.bindings.input.customer.enableDlq=truespring.cloud.stream.kafka.bindings.input.consumer.dlqName=反应流错误主题

文档中的示例适用于spring-cloud-stream,但同样的情况也会给反应管道带来错误。任何这方面的指引都会对社会有很大帮助。提前感谢!

@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveStreamSinkApplication.class, args);
}
@StreamListener
public void receive(@Input(Sink.INPUT) Flux<String> inputFlux) {
inputFlux.subscribe(System.out::println);
throw new RuntimeException("BOOM!");
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
@KafkaListener(id="bar", topics = "reactive-stream-error-topic")
public void error(String in) {
System.out.println(in + " from DLQ");
}
}

很抱歉回复太晚。

首先,您的代码和处理如何从反应流中抛出异常的方法存在问题。基本上,您处理的是一个声明性处理程序,它的处理方式截然不同。在您的代码中,receive方法在启动和初始化期间只会被调用一次。因此,从中抛出异常根本不等于流处理过程中抛出的异常,而流处理正是您所查询的错误处理机制所设计的。但是

除此之外。

随着Spring Cloud Function编程模型的引入,我们正在考虑将注意力从反应式模块转移到一起,因为Spring Cloud Function已经为反应式编程模型提供了支持。因此,请考虑以下内容:

@SpringBootApplication
@EnableBinding(Sink.class)
public class ReactiveStreamSinkApplication {
public static void main(String[] args) {
SpringApplication.run(ReactiveStreamSinkApplication.class,
"--spring.cloud.stream.function.definition=myconsumer");
}
@Bean
public Consumer<Flux<String>> myconsumer() {
return stream -> stream.subscribe(value -> {
if ("foo".equals(value)) {
throw new RuntimeException("BOOM!");
}
System.out.println("Received value: " + value);
});
}
@StreamListener("errorChannel")
public void error(Message<?> message) {
// log the error msg
System.out.println("Handling ERROR: " + message);
}
}

试试看,让我们知道。

最新更新