Spring Cloud Stream中基于Kafka头的条件(基于内容)路由



我在Spring Boot 2.x应用程序中使用Spring Cloud Stream 3.x来使用Kafka主题中的消息。

我想要一个监听器,它根据doc:有条件地使用一些自定义头值的消息

@StreamListener(value = "someTopic", condition = "headers['SomeHeader']=='SomeHeaderValue'")
public void onMessage(Message<?> message) {
LOGGER.info("Received: {}", message);
}

然而,监听器永远不会得到通知,如果条件被删除,我会在日志中看到以下内容:

Received: ... SomeHeader: [B@1055e4af ...

事实证明,自定义头是以Kafka字节数组原始格式留下的,这使得它们不符合条件评估的条件。

是否需要一些额外的配置,或者我遗漏了什么?

经过对源代码和stackoverflow的一些挖掘,我发现了以下内容:

  • Spring Cloud Stream委托Spring Kafka消息和头的转换(KafkaMessageChannelBinder~getHeaderMapper(
  • 默认的标头转换实现(BinderHeaderMapper(将标头保留为原始格式
  • Spring Cloud Stream允许自定义头映射,特别是将头从字节数组转换为字符串(在我的Spring Cloud Stream项目中,我如何将传入的头映射为String而不是byte[]?(

所以我添加了我的自定义标头映射器bean(bean名称很重要,它允许省略额外的配置属性(,它将我的自定义标题映射到String:

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
SimpleKafkaHeaderMapper headerMapper = new SimpleKafkaHeaderMapper();
headerMapper.setRawMappedHeaders(Map.of(
"SomeHeader", true
));
return headerMapper;
}

这解决了问题:

Received: ... SomeHeader: SomeHeaderValue ...

第页。S.这似乎是Spring Cloud Stream中的一个错误:

  1. 它引入了自己的头映射器实现(BinderHeaderMapper(,但后者不尊重条件路由特性
  2. 标头映射器在KafkaMessageChannelBinder中是子类的,这种添加的行为是不明显的,如果提供自定义标头映射器,则会丢失

相关内容

  • 没有找到相关文章

最新更新