NServiceBus和Azure长时间运行的处理程序模式



我们正在通过NServiceBus使用Azure服务总线,我在决定处理消息导致的长时间运行任务的正确架构时遇到了问题。

按照良好的做法,我们不想通过让消息处理程序等待长时间运行的进程(从远程服务器下载大文件)来阻止其返回,而实际上这样做会导致Azure SB丢失对消息的锁定。我们的计划是通过生成一个单独的任务来响应,并允许消息处理程序立即返回。

然而,这意味着处理程序现在可以立即用于下一条消息,这将导致另一个任务被派生,以此类推,直到消息队列为空。我想要的是在处理(数量有限的)早期消息时停止接收消息的方法。NServiceBus和Azure Service Bus是否有可接受的模式?

以下是如果我直接针对Azure SB 进行编程,我会做的事情

{
     while(true)
     {
          var message = bus.Next();
          message.Complete();
          // Do long running stuff here 
     }
}

动词Next和Complete可能是错误的,但在Azure下发生的情况是Next对消息进行临时锁定,这样其他消费者就无法再看到该消息。然后,您可以决定是否真的要处理消息,如果是,请致电Complete。然后将消息完全从队列中删除,否则将导致消息在一段时间后重新出现在队列中,因为Azure认为您崩溃了。尽管这段代码看起来很脏,但它会实现我的目标(为什么不这样做呢?)因为我的消费者只会在我下次有空时(在长时间运行的任务之后)消费。如果需要,其他消费者(其他实例)可以加入。

问题是NServiceBus添加了一个抽象级别,因此现在通过处理程序类上的方法来处理消息。

void Handle(NewFileMessage message)
{
    // Do work here
}

问题是Azure没有获得对消息的调用。Complete(),直到完成您的工作并在Handle方法退出之后。这就是为什么你需要保持短时间的工作。但是,如果您退出,也表示您已准备好处理另一条消息。这是我的Catch 22

在后台线程上下载是个好主意。您不想增加锁定持续时间,因为这是症状,而不是问题所在。你的下载可以很容易地超过最长锁定持续时间(5分钟),然后你就回到了原点。

你能做的就是下载一个精心策划的传奇故事。佐贺可以监控下载过程,当下载完成时,b/g进程会向佐贺发出完成的信号。如果下载从未完成,您可以有一个超时(或多个超时)来指示,并进行补偿操作或重试,无论什么对您的业务案例有效。

关于传奇的文档应该会让你开始:http://docs.particular.net/nservicebus/sagas/

在Azure服务总线中,您可以增加消息的锁定持续时间(默认设置为30秒),以防处理需要很长时间。

但是,除了您能够增加锁定持续时间外,这通常表明您的处理程序处理了大量的工作,这些工作可以分为不同的处理程序。

如果下载文件至关重要,我会将下载操作保留在处理程序中。这样,如果下载失败,可以再次处理消息,并重试下载。然而,如果你想立即释放处理程序来处理更多的消息,我建议你扩展执行下载任务的工作人员,以便系统能够满足需求。

最新更新