我希望能够并行处理带有Spring Integration的消息。消息来自多个设备,我们需要按顺序处理来自同一设备的消息,但设备可以在多个线程中处理。可能有成千上万的设备,所以我试图弄清楚如何尽可能多地使用 Spring 集成的语义根据设备 ID 的 mod 分配处理器。我应该考虑什么方法?
在不知道其他要求(事务语义等(的情况下很难概括,但最简单的方法可能是路由器在设备 ID 上使用某种哈希算法向多个QueueChannel
发送消息(因此特定设备的所有消息都转到同一通道(。
然后,让单线程轮询器从每个队列中提取消息。
编辑:(回应评论(
同样,很难一概而论,但是...
请参阅AbstractMessageRouter.determineTargetChannels()
- 路由器实际上返回一个物理通道对象(实际上是一个列表,但在大多数情况下是列表 1(。因此,是的,您可以通过编程方式创建QueueChannel
,并让路由器根据消息返回相应的。
假设您希望所有消息都由同一下游流处理,则还需要为每个队列通道创建一个<bridge/>
,以将其桥接到流中下一个组件的输入通道。
- 创建
QueueChannel
- 创建
BridgeHandler
(将outputChannel
设置为下一个组件的输入通道( - 创建一个
PollingConsumer
(构造函数采用通道和处理程序;设置trigger
等(
start(( 消费者。
所有这些都可以在自定义路由器初始化中完成,并实现determineTargetChannels()
以选择队列。
根据事件的处理时间,我通常建议在轮询器线程上运行下游流,而不是设置taskExecutor
以避免下一次轮询尝试在此任务完成之前安排另一个任务的问题。您可能需要增加默认taskScheduler
的池大小。