在GitHub https://github.com/spring-cloud/spring-cloud-gcp/tree/master/spring-cloud-gcp-samples/spring-cloud-gcp-pubsub-polling-binder-sample中发现的关于从PubSub订阅轮询消息的示例,我想知道…
是否有可能使PollableMessageSource
检索List<Message<?>>
而不是每次轮询单个消息?
我看到@Poller
符号只在Source
类型对象中使用,从未在Processor
或Sink
中使用。是否有可能在这样的上下文中使用例如使用@StreamListener
或使用功能方法?
PollableMessageSource
绑定和Source
流应用程序完全基于Spring Integration中的Poller
和MessageSource
抽象,其契约是为配置的通道生成单个消息。消息传递的重点实际上是处理单个消息而不影响其他消息。一条消息的失败并不意味着流中的其他消息也会失败。
另一方面,您可能意味着GCP Pub/Sub消息将作为Spring消息有效负载中的列表生成。这真的是可能的,但通过一些自定义代码从Pub/Sub消费者和MessageSource
impl。尽管我会三思而后行,期望从源代码中获得一些批处理。如果您的进一步逻辑是作为列表处理,那么您可能会使用聚合器来构建一些小窗口。但同样,它将是一条Spring消息。
最好开始考虑一个响应式函数的实现,你可以期待一个Flux<Message<?>>
作为输入,Spring Cloud Stream框架会照顾你如何从Pub/Sub发送数据到你期望的响应式流。
更多信息见文档:https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#_reactive_functions_support