春云流-一对多转换



我有一个简单的Spring Cloud Stream应用程序,其中流接受一个原始JSON字符串并生成一个对象列表。

使用的版本:

<spring.cloud.version>Hoxton.SR2</spring.cloud.version>

所用流的签名:

@Bean
public Function<String, List<EmailEvent>> normalizeStream() {
return value -> {
return getNormalizedEvents(value);
};
}

期望的是,尽管流的输出签名是List,但它应该序列化为队列中的单个项。此更改在这个版本的Cloud流中非常有效。

但是,当我升级到Hoxton.SR4或更高版本时,这种期望正在打破,生产商只生产列表,而不是单个项目。

对于我可能遗漏的任何帮助,我们将不胜感激。

我的意思是,事情就在您正在使用的方法签名中。

使用此版本,结果与预期一致。查看以下示例。

<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-parent</artifactId>
<version>3.1.0-RC1</version>
</parent>

第一个案例。一对多(封装在列表中(

在第一个示例中,每个String消息都会生成一个带有List<String>对象有效负载的消息,该消息将发送给kafka。

@Bean
public Function<String, List<String>> oneToMany() {
return str -> {
return List.of(str, "custom");
};
}

使用kafkaat:阅读输出主题

% Reached end of topic one-to-many [0] at offset 180
["a","custom"]

2ns案例。一对多(逐个(

在另一个例子中(我认为这就是您想要的(,每个字符串都被转换为List,然后List<Message<<String>>中的每个项都被绑定桥发送到kafka。

@Bean
public Function<String, List<Message<String>>> oneToMany() {
return str -> {
return List.of(MessageBuilder.withPayload(str).build(), MessageBuilder.withPayload("custom").build());
};
}

使用kafkaat:阅读输出主题

% Reached end of topic one-to-many [0] at offset 185
a
custom

最新更新