如何在不锁定队列的情况下通过NServiceBus发送大量消息



所以我对NServiceBus进行了一些性能评估,我意识到如果你试图同时发送1000条消息,它的行为会非常奇怪。。。它实际上以异步方式发送它们(这很好),但它会从处理程序锁定队列。结果是,在发送方完成所有消息的发送之前,处理程序无法处理任何消息。

这种行为表现为两种略有不同的方式。

  • 在处理程序内部,如果你进行了大量的发送,那么在处理程序完成之前,接收队列看起来是被锁定的(比如说,你在每次发送之间添加了一个线程睡眠,接收器在处理程序结束之前不会开始处理消息

  • 如果我只是从一辆新开的巴士上发送消息,那么一个小小的睡眠就会破坏这种关系,但如果我只是"一次"发送1000条消息,那么处理程序直到写完最后一条消息后才会收到第一条消息,即使每一条消息(此时)都应该是一个单独的调用。

这里是否有一个未记录的批量发送策略或其他正在发生的事情……我知道你通常不会"想"这样做,但了解处理程序发送或普通总线批量发送过程中发生的事情非常重要;-)。

NServiceBus消息处理程序默认情况下在TransactionScope中运行。消息的处理、您对业务数据的任何更新以及任何新消息的发送都将完成或回滚。这就是事务性消息传递的全部内容。

如果在消息处理程序中发送1000条消息,那么在底层消息传递基础结构成功接收所有消息之前,它不会完成。这可能需要一些时间,具体取决于您的硬件。

如果你想在默认情况下选择退出这种安全的方法,你可以做几件事。你可以禁用NServiceBus端点的事务处理,也可以在发送消息时取消环境事务范围。但是,请注意,您不再有任何事务性保证,因此,如果您在发送了1000条消息中的500条之后出现异常,则会发送500条,而不会发送500条。

我的团队策略之一是尝试将大批量分解为小批量,然后有一个处理程序来接收这些小批量,并为每个小批量推出单独的事件。

场景:我们有一个端点,它读取数据库日志文件,并为日志文件的每一行推送一个"TransactionOccurred"事件。然后,我们在10秒超时后再次读取日志文件,并推出另一批消息。

因此,我们没有在一个处理程序中推出5K条消息,而是将其分解为每条1K的5条消息,并发送一个命令。然后,我们有一个处理程序,它接收1K批处理消息,循环并为每条消息发布一个单独的事件。

这个问题是在对5K消息进行"发布"时出现的,因为有几个事件正在发布,每个事件都有一组不同的订阅者,他们在同一服务器和远程服务器上排队,这降低了系统的速度。

通过这种策略,我们还能够将MaximumConcurrentLevel调高一点,以便一次处理多条消息,并能够获得更高的吞吐量。

我们已经在少数几个端点上完成了这项工作,每个端点在批处理大小和MaximumConcurrentLevel值方面都有点不同。我建议获得一组50-100K消息的控制集,并将这些值稍微移动到最适合您情况的值。

最新更新