是否有一种方法可以过滤功能春云流粘合剂上的消息头?



在@StreamListener接收消息的方式(非功能)中,有这样一个条件:

@StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")

我现在在做什么:

spring.cloud.stream.function.definition=processspring.cloud.stream.bindings.process-in-0.destination=my-topic

@Bean
public Consumer<Message<SomeDTO>> process() {
return message -> Mono.just(message)
.doOnNext(msg -> LOGGER.debug("Message received: {}", msg))
.filter(this::headerFilter)
.map(this::extractEvent)
.filter(this::payloadAttributeFilter)
.block(blockTimeout);
}

但是我在方法内部过滤,当我传递其他头部时,使用另一个有效载荷,spring不知道如何反实现此消息,并在处理消息之前带来com.fasterxml.jackson.core.JsonParseException,因为无法创建SomeDTO"例如:

我想在spring反序列化它的有效载荷和崩溃之前找到一种过滤消息头的方法。

我使用函数的事件舍入来解决这个问题:https://spring.io/blog/2019/10/31/spring-cloud-stream-event-routing

最新更新