我刚刚加入了一个项目,该项目使用Spring Cloud Stream作为Kafka的包装器。这个想法是,我们将抽象消息传递API,这样我们就可以自由地切换消息传递平台。我发现API令人困惑,尤其是最近一轮的抨击将我从基于注释的模型重定向到基于SpringCloudFunction的功能模型。我觉得我错过了一些东西,因为规定的编程模型似乎让产生消息的简单行为变得很痛苦。我们没有像kafkaTemplate.sendDefault("Hello")
这样的东西,而是规定了一些怪物,比如:
@Bean
public Supplier<Flux<String>> stringSupplier() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
return "Hello from Supplier";
} catch (Exception e) {
// ignore
}
}
})).subscribeOn(Schedulers.elastic()).share();
}
我应该如何用这种API做简单的消息驱动代码?
我认为将要生成的无限数据源与单个kafkaTemplate.send()
进行比较是不正确的。如果这只是您逻辑中需要的功能,请考虑使用StreamBridge
:https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources.
Supplier<Flux>
bean是框架启动源数据生成逻辑的一个特殊信号。如果你自己做,事情就不会那么简单了。我还不是说你的Flux.fromStream(Stream.generate())
可以换成一个Flux.generate()
。也不清楚为什么我会使用特殊的调度器和共享。。。
如果您不想执行复杂的Flux
逻辑,还有一种@PollableBean
方法:https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_suppliers_sources
否;用于按计划发布消息。
请参阅https://docs.spring.io/spring-cloud-stream/docs/3.2.1/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources
它只是streamBridge.send("bindingName", "something-to-send")
。
绑定可以在application.yml中预先配置,也可以在第一次发送时创建。