操作需要时间时重试消息



我有一个使用Azure ServiceBus的消息传递系统,但我正在使用Nimbus。我有一个端点,它将命令发送到另一个端点,另一端的处理程序类在某一时刻拾取它,所以一切正常。

当操作需要时间(大约超过 20 秒左右)时,处理程序会收到具有相同消息的"另一个"调用。看起来 Nimbus 正在重试已经由处理程序的另一个(甚至是相同的)实例处理的消息,我没有看到任何异常被抛出,我可以轻松地使用以下处理程序重现这一点:

public class Synchronizer : IHandleCommand<RequestSynchronization>
{
    public async Task Handle(RequestSynchronization synchronizeInfo)
    {
        Console.WriteLine("Received Synchronization");
        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
        Console.WriteLine("Got through first timeout");
        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
        Console.WriteLine("Got through second timeout");
    }
}

我的问题是:如何禁用此行为?我很高兴交易需要时间,因为这是我从我的网站上卸载的繁重过程,这是首先使用这种架构的全部意义。

换句话说,我期望消息不会被另一个处理程序拾取,而另一个处理程序已经拾取并正在处理它,除非有异常并且消息返回到队列并最终被拾取重试。

任何想法如何做到这一点?我错过了什么?

默认情况下,ASB/WSB 将为您提供 30 秒的消息锁定。这个想法是,你从队列的头部弹出一个代理消息,但必须要么.完成() 或 .在锁定超时内放弃该消息。

如果不这样做,服务总线会假定你已崩溃或失败,它会将该消息返回到队列进行重新处理。

您有以下几种选择:

1) 在处理程序上实现 ILongRunningHandler。Nimbus会注意剩余的锁定时间,并自动更新您的消息锁定。警告:无论您续订多少次,ASB/WSB 支持的最大消息锁定时间为五分钟,因此如果您的处理程序花费的时间超过该时间,则可能需要选项 #2。

public class Synchronizer : IHandleCommand<RequestSynchronization>, ILongRunningTask
{
    public async Task Handle(RequestSynchronization synchronizeInfo)
    {
        Console.WriteLine("Received Synchronization");
        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
        Console.WriteLine("Got through first timeout");
        await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
        Console.WriteLine("Got through second timeout");
    }
}

2) 在你的处理程序中,调用一个 Task.Run(() => SomeService(yourMessage)) 并返回。如果执行此操作,请注意依赖项的生存期范围(如果处理程序采用任何依赖项)。如果需要 IFoo,请依赖 Func>(或等效项,具体取决于容器),并在处理任务中解决该问题。

public class Synchronizer : IHandleCommand<RequestSynchronization>
{
    private readonly Func<Owned<IFoo>> fooFunc;
    public Synchronizer(Func<Owned<IFoo>> fooFunc)
    {
        _fooFunc = fooFunc;
    }
    public async Task Handle(RequestSynchronization synchronizeInfo)
    {
        // don't await!
        Task.Run(() => {
            using (var foo = _fooFunc())
            {
              Console.WriteLine("Received Synchronization");
              await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate long running process
              Console.WriteLine("Got through first timeout");
              await Task.Delay(TimeSpan.FromSeconds(30)); //Simulate another long running process
              Console.WriteLine("Got through second timeout");
            }
        });
    }
}

我认为您正在这里寻找代码:http://www.uglybugger.org/software/post/support_for_long_running_handlers_in_nimbus

最新更新