BrokeredMessage在调用OnMessage()后自动处理



我正在尝试从Azure服务总线排队项目,以便我可以批量处理它们。我知道Azure服务总线有一个ReceiveBatch(),但它似乎有问题,原因如下:

  • 我一次最多只能收到256条消息,即使这样,也可以根据消息大小随机发送。
  • 即使我偷看看有多少消息在等待,我也不知道有多少RequestBatch调用要做,因为我不知道每次调用会给我多少消息。由于消息将继续进来,我不能只是继续发出请求,直到它是空的,因为它永远不会是空的。

我决定只使用消息侦听器,这比浪费时间更便宜,而且会给我更多的控制。

基本上,我试图让一定数量的消息建立和然后立即处理它们。我用计时器强制延迟,但我需要能够在项目进入时对其进行排队。

根据我的计时器要求,似乎阻塞集合不是一个好的选择,所以我试图使用ConcurrentBag。

var batchingQueue = new ConcurrentBag<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});
while (true)
{
    var sw = WaitableStopwatch.StartNew();
    BrokeredMessage msg;
    while (batchingQueue.TryTake(out msg)) // <== Object is already disposed
    {
        ...do this until I have a thousand ready to be written to DB in batch
        Console.WriteLine("Completing message");
        msg.Complete(); // <== ERRORS HERE
    }
    sw.Wait(MINIMUM_DELAY);
}

然而,一旦我访问OnMessage之外的消息管道显示BrokeredMessage已经被处理。

我认为这一定是OnMessage的一些自动行为,我看不到任何方法可以用消息做任何事情,而不是立即处理它,这是我不想做的。

这在BlockingCollection中非常容易做到。

var batchingQueue = new BlockingCollection<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
    Console.WriteLine("Queueing message");
    batchingQueue.Add(m);
});

和你的消费者线程:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    Console.WriteLine("Completing message");
    msg.Complete();
}

GetConsumingEnumerable返回一个迭代器,该迭代器消耗队列中的项,直到设置了IsCompleted属性且队列为空为止。如果队列为空,但IsCompletedFalse,则非忙等待下一项。

要取消消费者线程(即关闭程序),您停止向队列添加东西并让主线程调用batchingQueue.CompleteAdding。消费者将清空队列,看到IsCompleted属性为True,然后退出。

在这里使用BlockingCollectionConcurrentBagConcurrentQueue更好,因为BlockingCollection接口更容易使用。特别是,使用GetConsumingEnumerable使您不必担心检查计数或执行繁忙等待(轮询循环)。

还要注意ConcurrentBag有一些相当奇怪的移除行为。特别是,项被删除的顺序取决于哪个线程删除项。创建袋子的线程以不同于其他线程的顺序移除物品。详情请参见使用ConcurrentBag Collection。

你没有说为什么要批处理输入项。除非有压倒一切的性能原因,否则用批处理逻辑使代码复杂化似乎不是一个特别好的主意。


如果你想批量写入数据库,那么我建议使用一个简单的List<T>来缓冲项目。如果必须在将条目写入数据库之前处理它们,那么可以使用我上面展示的技术来处理它们。然后,将项目添加到列表中,而不是直接写入数据库。当列表有1000个项目时,或者给定的时间已经过去了,分配一个新列表并启动一个任务将旧列表写入数据库。这样的:

// at class scope
// Flush every 5 minutes.
private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5);
private const int MaxBufferItems = 1000;
// Create a timer for the buffer flush.
System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush, FlushDelay.TotalMilliseconds, Timeout.Infinite);  
// A lock for the list. Unless you're getting hundreds of thousands
// of items per second, this will not be a performance problem.
object _listLock = new Object();
List<BrokeredMessage> _recordBuffer = new List<BrokeredMessage>();

然后,在你的消费者中:

foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
    // process the message
    Console.WriteLine("Completing message");
    msg.Complete();
    lock (_listLock)
    {
        _recordBuffer.Add(msg);
        if (_recordBuffer.Count >= MaxBufferItems)
        {
            // Stop the timer
            _flushTimer.Change(Timeout.Infinite, Timeout.Infinite);
            // Save the old list and allocate a new one
            var myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();
            // Start a task to write to the database
            Task.Factory.StartNew(() => FlushBuffer(myList));
            // Restart the timer
            _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
        }
    }
}
private void TimedFlush()
{
    bool lockTaken = false;
    List<BrokeredMessage> myList = null;
    try
    {
        if (Monitor.TryEnter(_listLock, 0, out lockTaken))
        {
            // Save the old list and allocate a new one
            myList = _recordBuffer;
            _recordBuffer = new List<BrokeredMessage>();
        }
    }
    finally
    {
        if (lockTaken)
        {
            Monitor.Exit(_listLock);
        }
    }
    if (myList != null)
    {
        FlushBuffer(myList);
    }
    // Restart the timer
    _flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
}

这里的想法是,您将旧列表清除,分配一个新列表以便处理可以继续,然后将旧列表的项写入数据库。锁的存在是为了防止计时器和记录计数器相互踩踏。如果没有锁,事情可能会在一段时间内正常工作,然后你会在不可预测的时间出现奇怪的崩溃。

我喜欢这种设计,因为它消除了消费者的轮询。我唯一不喜欢的是,消费者必须知道计时器(即它必须停止,然后重新启动计时器)。再多想一点,我就可以消除这个要求。但它的写作方式很好。

切换到OnMessageAsync为我解决了这个问题

_queueClient.OnMessageAsync(async receivedMessage =>

我联系了Microsoft关于BrokeredMessage在MSDN上被处理的问题,这是响应:

非常基本的规则,我不确定是否有文档说明。接收到的消息需要在回调函数的生命周期内处理。在您的情况下,消息将在异步回调完成时被处理,这就是为什么您的完整尝试在另一个线程中使用ObjectDisposedException失败的原因。

我真的不明白为进一步处理排队消息对吞吐量有什么帮助。这肯定会增加客户的负担。尝试在异步回调中处理消息,这应该是足够的性能。

在我的例子中,这意味着我不能以我想要的方式使用ServiceBus,我必须重新考虑我想要的东西是如何工作的。同性恋者。

我在开始使用Azure服务总线服务时遇到了同样的问题。

我发现方法OnMessage总是处置BrokedMessage对象。Jim Mischel提出的方法对我没有帮助(但是读起来很有趣——谢谢!)。

经过一番调查,我发现整个方法都是错误的。让我来告诉你正确的做法。
  1. 只在OnMessage方法处理程序中使用BrokedMessage.Complete()方法。
  2. 如果你需要在这个方法之外处理消息,你应该使用QueueClient方法。完成(Guid lockToken)。LockToken是BrokeredMessage对象的属性。

的例子:

 var messageOptions = new OnMessageOptions {
      AutoComplete       = false,
      AutoRenewTimeout   = TimeSpan.FromMinutes( 5 ),
     MaxConcurrentCalls = 1
 };
 var buffer = new Dictionary<string, Guid>();
 // get message from queue 
 myQueueClient.OnMessage(
      m => buffer.Add(key: m.GetBody<string>(), value: m.LockToken), 
      messageOptions // this option says to ServiceBus to "froze" message in he queue until we process it
 );         
 foreach(var item in buffer){
    try {
        Console.WriteLine($"Process item: {item.Key}");
        myQueueClient.Complete(item.Value);// you can also use method CompleteBatch(...) to improve performance
    } 
    catch{
        // "unfroze" message in ServiceBus. Message would be delivered to other listener 
        myQueueClient.Defer(item.Value);
    }
 }

我的解决方案是获取消息SequenceNumber,然后延迟消息并将SequenceNumber添加到BlockingCollection中。一旦BlockingCollection拾取到一个新项目,它就可以通过SequenceNumber接收延迟的消息,并将消息标记为完成。如果由于某种原因BlockingCollection没有处理SequenceNumber,它将作为延迟保留在队列中,以便稍后在进程重新启动时可以拾取它。如果进程异常终止,而BlockingCollection中仍有项目,则可以防止丢失消息。

BlockingCollection<long> queueSequenceNumbers = new BlockingCollection<long>();
//This finds any deferred/unfinished messages on startup. 
BrokeredMessage existingMessage = client.Peek();
while (existingMessage != null)
{
    if (existingMessage.State == MessageState.Deferred)
    {
        queueSequenceNumbers.Add(existingMessage.SequenceNumber);
    }
    existingMessage = client.Peek();
}

//setup the message handler
Action<BrokeredMessage> processMessage = new Action<BrokeredMessage>((message) =>
{
    try
    {
        //skip deferred messages if they are already in the queueSequenceNumbers collection.
        if (message.State != MessageState.Deferred || (message.State == MessageState.Deferred && !queueSequenceNumbers.Any(x => x == message.SequenceNumber)))
        {
            message.Defer();
            queueSequenceNumbers.Add(message.SequenceNumber);
        }
    }
    catch (Exception ex)
    {
         // Indicates a problem, unlock message in queue
         message.Abandon();
    }
});

// Callback to handle newly received messages
client.OnMessage(processMessage, new OnMessageOptions() { AutoComplete = false, MaxConcurrentCalls = 1 });            
//start the blocking loop to process messages as they are added to the collection
foreach (var queueSequenceNumber in queueSequenceNumbers.GetConsumingEnumerable())
{
     var message = client.Receive(queueSequenceNumber);
     //mark the message as complete so it's removed from the queue
     message.Complete();                 
     //do something with the message       
}

最新更新