Spring 集成消息处理按标头信息分区



我希望能够并行处理带有Spring Integration的消息。消息来自多个设备,我们需要按顺序处理来自同一设备的消息,但设备可以在多个线程中处理。可能有成千上万的设备,所以我试图弄清楚如何尽可能多地使用 Spring 集成的语义根据设备 ID 的 mod 分配处理器。我应该考虑什么方法?

在不知道其他要求(事务语义等(的情况下很难概括,但最简单的方法可能是路由器在设备 ID 上使用某种哈希算法向多个QueueChannel发送消息(因此特定设备的所有消息都转到同一通道(。

然后,让单线程轮询器从每个队列中提取消息。

编辑:(回应评论(

同样,很难一概而论,但是...

请参阅AbstractMessageRouter.determineTargetChannels() - 路由器实际上返回一个物理通道对象(实际上是一个列表,但在大多数情况下是列表 1(。因此,是的,您可以通过编程方式创建QueueChannel,并让路由器根据消息返回相应的。

假设您希望所有消息都由同一下游流处理,则还需要为每个队列通道创建一个<bridge/>,以将其桥接到流中下一个组件的输入通道。

  • 创建QueueChannel
  • 创建BridgeHandler(将outputChannel设置为下一个组件的输入通道(
  • 创建一个PollingConsumer(构造函数采用通道和处理程序;设置trigger等(

start(( 消费者。

所有这些都可以在自定义路由器初始化中完成,并实现determineTargetChannels()以选择队列。

根据事件的处理时间,我通常建议在轮询器线程上运行下游流,而不是设置taskExecutor以避免下一次轮询尝试在此任务完成之前安排另一个任务的问题。您可能需要增加默认taskScheduler 的池大小。

最新更新