无法过滤使用春云流@StreamListener注释中的条件属性接收的消息



我正在尝试创建一个基于事件的系统,用于使用Apache Kafka作为消息传递系统和Spring Cloud Stream Kafka的服务之间的通信。

我已经编写了我的接收器类方法如下,

@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeCreatedEvent'")
public void handleEmployeeCreatedEvent(@Payload String payload) {
logger.info("Received EmployeeCreatedEvent: " + payload);
}

此方法专门用于捕获与 EmployeeCreatedEvent 相关的消息或事件。

@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'")
public void handleEmployeeTransferredEvent(@Payload String payload) {
logger.info("Received EmployeeTransferredEvent: " + payload);
}

此方法专门用于捕获与 EmployeeTransferredEvent 相关的消息或事件。

@StreamListener(target = Sink.INPUT)
public void handleDefaultEvent(@Payload String payload) {
logger.info("Received payload: " + payload);
}

这是默认方法。

当我运行应用程序时,我无法看到使用正在调用的条件属性注释的方法。我只看到正在调用的句柄默认事件方法。

我使用以下自定义消息源类从发送/源应用程序向此接收器应用程序发送消息,如下所示,

@Component
@EnableBinding(Source.class)
public class CustomMessageSource {
@Autowired
private Source source;                

public void  sendMessage(String payload,String eventType) {
Message<String> myMessage = MessageBuilder.withPayload(payload)
.setHeader("eventType", eventType)
.build();
source.output().send(myMessage);
}
}

我正在从源应用程序中的控制器调用该方法,如下所示,

customMessageSource.sendMessage("Hello","EmployeeCreatedEvent");

自定义消息源实例自动连线,如下所示,

@Autowired
CustomMessageSource customMessageSource;

基本上,我想过滤接收器/接收器应用程序收到的消息并相应地处理它们。

为此,我使用带有条件属性的@StreamListener注释来模拟处理不同事件的行为。

我正在使用Spring Cloud Stream Chelsea.SR2版本。

有人可以帮助我解决这个问题吗?

似乎标头没有传播。确保在spring.cloud.stream.kafka.binder.headershttp://docs.spring.io/autorepo/docs/spring-cloud-stream-docs/Chelsea.SR2/reference/htmlsingle/#_kafka_binder_properties 中包含自定义标头。

最新更新