与RabbitMq共享worker池



我正在构建一个多租户应用程序,其中用户可以提交由工人处理的批量任务(工人的数量是动态的),我想实现的是:

  • 如果我们有来自一个用户的单个批处理,则让所有worker处理来自该单个用户的消息
  • 如果另一个用户提交批处理作业,则每个用户获得一半的工人(因此第一个用户现在以较慢的速度工作,而后一个用户不必等到第一个用户完成所有冗长的作业)

这样的事情是可能的工作队列?(出于某种原因,感觉像是对FIFO和队列的想法的扭曲,但这是我的用法:D)

您可以查看优先级队列实现:https://www.rabbitmq.com/priority.html

如果这对你不起作用,你可以尝试一些其他的技巧来达到你想要的:

您可以将100个队列绑定到主题交换,并将路由键设置为用户ID % 100的散列,即每个任务将具有1到100之间的键,并且同一用户的任务将具有相同的键。每个队列都使用1到100之间的唯一模式绑定。现在,您有一组工人,它们从随机队列号开始,然后在每个作业之后增加该队列号,再次增加% 100,以便在队列100之后循环回到队列1。

现在,您的worker舰队可以并行处理多达100个唯一的用户,或者如果没有其他工作要做,所有的worker可以专注于单个用户。如果工人需要在每个作业之间循环遍历所有100个队列,那么在只有单个用户在单个队列上有大量作业的场景中,每个作业之间自然会有一些开销。减少队列数量是解决这个问题的一种方法。您还可以让每个worker保持到每个队列的连接,并从每个队列中消费最多一条未确认的消息。如果未确认的消息超时设置得足够高,则工作线程可以更快地循环内存中的挂起消息。

或者您可以创建两个交换器,每个交换器都有一个绑定队列。所有工作都进入第一个交换和队列,由一群工作人员使用。如果一个工作单元花费的时间太长,工作者可以取消它并将其推到第二个队列。当第一个队列上没有任何内容时,工作线程只处理第二个队列。您可能还需要几个具有相反队列优先级的工作线程,以确保在有永无止境的短任务流到达时仍然处理长时间运行的任务,以便最终始终处理用户批处理。这并不能真正地将您的worker队列分布到所有任务中,但它将阻止来自一个用户的长时间运行的任务,从而阻止您的worker为同一用户或另一个用户执行短时间运行的任务。它还假定您可以取消作业并在以后重新运行它而不会出现任何问题。这也意味着超时且需要以低优先级重新运行的任务会浪费资源。除非你能提前识别快慢任务

如果单个用户有100个慢任务,那么另一个用户发布一批任务,那么第一个100个队列的建议也可能有问题。这些任务将不会被查看,直到其中一个慢任务完成。如果这是一个合理的问题,您可以将这两个解决方案结合起来。

最新更新