我有一个使用Azure ServiceBus的消息传递系统,但我正在使用Nimbus。我有一个端点,它将命令发送到另一个端点,另一端的处理程序类在某一时刻拾取它,所以一切正常。
当操作需要时间(大约超过 20 秒左右)时,处理程序会收到具有相同消息的"另一个"调用。看起来 Nimbus 正在重试已经由处理程序的另一个(甚至是相同的)实例处理的消息,我没有看到任何异常被抛出,我可以轻松地使用以下处理程序重现这一点:
public class Synchronizer : IHandleCommand<RequestSynchronization>
{
public async Task Handle(RequestSynchronization synchronizeInfo)
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
}
我的问题是:如何禁用此行为?我很高兴交易需要时间,因为这是我从我的网站上卸载的繁重过程,这是首先使用这种架构的全部意义。
换句话说,我期望消息不会被另一个处理程序拾取,而另一个处理程序已经拾取并正在处理它,除非有异常并且消息返回到队列并最终被拾取重试。
任何想法如何做到这一点?我错过了什么?
默认情况下,ASB/WSB 将为您提供 30 秒的消息锁定。这个想法是,你从队列的头部弹出一个代理消息,但必须要么.完成() 或 .在锁定超时内放弃该消息。
如果不这样做,服务总线会假定你已崩溃或失败,它会将该消息返回到队列进行重新处理。
您有以下几种选择:
1) 在处理程序上实现 ILongRunningHandler。Nimbus会注意剩余的锁定时间,并自动更新您的消息锁定。警告:无论您续订多少次,ASB/WSB 支持的最大消息锁定时间为五分钟,因此如果您的处理程序花费的时间超过该时间,则可能需要选项 #2。
public class Synchronizer : IHandleCommand<RequestSynchronization>, ILongRunningTask
{
public async Task Handle(RequestSynchronization synchronizeInfo)
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
}
2) 在你的处理程序中,调用一个 Task.Run(() => SomeService(yourMessage)) 并返回。如果执行此操作,请注意依赖项的生存期范围(如果处理程序采用任何依赖项)。如果需要 IFoo,请依赖 Func>(或等效项,具体取决于容器),并在处理任务中解决该问题。
public class Synchronizer : IHandleCommand<RequestSynchronization>
{
private readonly Func<Owned<IFoo>> fooFunc;
public Synchronizer(Func<Owned<IFoo>> fooFunc)
{
_fooFunc = fooFunc;
}
public async Task Handle(RequestSynchronization synchronizeInfo)
{
// don't await!
Task.Run(() => {
using (var foo = _fooFunc())
{
Console.WriteLine("Received Synchronization");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
Console.WriteLine("Got through first timeout");
await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
Console.WriteLine("Got through second timeout");
}
});
}
}
我认为您正在这里寻找代码:http://www.uglybugger.org/software/post/support_for_long_running_handlers_in_nimbus