我有一个简单的Spring Cloud Stream应用程序,其中流接受一个原始JSON字符串并生成一个对象列表。
使用的版本:
<spring.cloud.version>Hoxton.SR2</spring.cloud.version>
所用流的签名:
@Bean
public Function<String, List<EmailEvent>> normalizeStream() {
return value -> {
return getNormalizedEvents(value);
};
}
期望的是,尽管流的输出签名是List,但它应该序列化为队列中的单个项。此更改在这个版本的Cloud流中非常有效。
但是,当我升级到Hoxton.SR4或更高版本时,这种期望正在打破,生产商只生产列表,而不是单个项目。
对于我可能遗漏的任何帮助,我们将不胜感激。
我的意思是,事情就在您正在使用的方法签名中。
使用此版本,结果与预期一致。查看以下示例。
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.0-RC1</version>
</parent>
第一个案例。一对多(封装在列表中(
在第一个示例中,每个String消息都会生成一个带有List<String>
对象有效负载的消息,该消息将发送给kafka。
@Bean
public Function<String, List<String>> oneToMany() {
return str -> {
return List.of(str, "custom");
};
}
使用kafkaat:阅读输出主题
% Reached end of topic one-to-many [0] at offset 180
["a","custom"]
2ns案例。一对多(逐个(
在另一个例子中(我认为这就是您想要的(,每个字符串都被转换为List,然后List<Message<<String>>
中的每个项都被绑定桥发送到kafka。
@Bean
public Function<String, List<Message<String>>> oneToMany() {
return str -> {
return List.of(MessageBuilder.withPayload(str).build(), MessageBuilder.withPayload("custom").build());
};
}
使用kafkaat:阅读输出主题
% Reached end of topic one-to-many [0] at offset 185
a
custom