我正在尝试从Azure服务总线排队项目,以便我可以批量处理它们。我知道Azure服务总线有一个ReceiveBatch(),但它似乎有问题,原因如下:
- 我一次最多只能收到256条消息,即使这样,也可以根据消息大小随机发送。
- 即使我偷看看有多少消息在等待,我也不知道有多少RequestBatch调用要做,因为我不知道每次调用会给我多少消息。由于消息将继续进来,我不能只是继续发出请求,直到它是空的,因为它永远不会是空的。
我决定只使用消息侦听器,这比浪费时间更便宜,而且会给我更多的控制。
基本上,我试图让一定数量的消息建立和然后立即处理它们。我用计时器强制延迟,但我需要能够在项目进入时对其进行排队。
根据我的计时器要求,似乎阻塞集合不是一个好的选择,所以我试图使用ConcurrentBag。
var batchingQueue = new ConcurrentBag<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
Console.WriteLine("Queueing message");
batchingQueue.Add(m);
});
while (true)
{
var sw = WaitableStopwatch.StartNew();
BrokeredMessage msg;
while (batchingQueue.TryTake(out msg)) // <== Object is already disposed
{
...do this until I have a thousand ready to be written to DB in batch
Console.WriteLine("Completing message");
msg.Complete(); // <== ERRORS HERE
}
sw.Wait(MINIMUM_DELAY);
}
然而,一旦我访问OnMessage之外的消息管道显示BrokeredMessage已经被处理。
我认为这一定是OnMessage的一些自动行为,我看不到任何方法可以用消息做任何事情,而不是立即处理它,这是我不想做的。
这在BlockingCollection
中非常容易做到。
var batchingQueue = new BlockingCollection<BrokeredMessage>();
myQueueClient.OnMessage((m) =>
{
Console.WriteLine("Queueing message");
batchingQueue.Add(m);
});
和你的消费者线程:
foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
Console.WriteLine("Completing message");
msg.Complete();
}
GetConsumingEnumerable返回一个迭代器,该迭代器消耗队列中的项,直到设置了IsCompleted
属性且队列为空为止。如果队列为空,但IsCompleted
为False
,则非忙等待下一项。
要取消消费者线程(即关闭程序),您停止向队列添加东西并让主线程调用batchingQueue.CompleteAdding
。消费者将清空队列,看到IsCompleted
属性为True
,然后退出。
在这里使用BlockingCollection
比ConcurrentBag
或ConcurrentQueue
更好,因为BlockingCollection
接口更容易使用。特别是,使用GetConsumingEnumerable
使您不必担心检查计数或执行繁忙等待(轮询循环)。
还要注意ConcurrentBag
有一些相当奇怪的移除行为。特别是,项被删除的顺序取决于哪个线程删除项。创建袋子的线程以不同于其他线程的顺序移除物品。详情请参见使用ConcurrentBag Collection。
你没有说为什么要批处理输入项。除非有压倒一切的性能原因,否则用批处理逻辑使代码复杂化似乎不是一个特别好的主意。
如果你想批量写入数据库,那么我建议使用一个简单的List<T>
来缓冲项目。如果必须在将条目写入数据库之前处理它们,那么可以使用我上面展示的技术来处理它们。然后,将项目添加到列表中,而不是直接写入数据库。当列表有1000个项目时,或者给定的时间已经过去了,分配一个新列表并启动一个任务将旧列表写入数据库。这样的:
// at class scope
// Flush every 5 minutes.
private readonly TimeSpan FlushDelay = TimeSpan.FromMinutes(5);
private const int MaxBufferItems = 1000;
// Create a timer for the buffer flush.
System.Threading.Timer _flushTimer = new System.Threading.Timer(TimedFlush, FlushDelay.TotalMilliseconds, Timeout.Infinite);
// A lock for the list. Unless you're getting hundreds of thousands
// of items per second, this will not be a performance problem.
object _listLock = new Object();
List<BrokeredMessage> _recordBuffer = new List<BrokeredMessage>();
然后,在你的消费者中:
foreach (var msg in batchingQueue.GetConsumingEnumerable())
{
// process the message
Console.WriteLine("Completing message");
msg.Complete();
lock (_listLock)
{
_recordBuffer.Add(msg);
if (_recordBuffer.Count >= MaxBufferItems)
{
// Stop the timer
_flushTimer.Change(Timeout.Infinite, Timeout.Infinite);
// Save the old list and allocate a new one
var myList = _recordBuffer;
_recordBuffer = new List<BrokeredMessage>();
// Start a task to write to the database
Task.Factory.StartNew(() => FlushBuffer(myList));
// Restart the timer
_flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
}
}
}
private void TimedFlush()
{
bool lockTaken = false;
List<BrokeredMessage> myList = null;
try
{
if (Monitor.TryEnter(_listLock, 0, out lockTaken))
{
// Save the old list and allocate a new one
myList = _recordBuffer;
_recordBuffer = new List<BrokeredMessage>();
}
}
finally
{
if (lockTaken)
{
Monitor.Exit(_listLock);
}
}
if (myList != null)
{
FlushBuffer(myList);
}
// Restart the timer
_flushTimer.Change(FlushDelay.TotalMilliseconds, Timeout.Infinite);
}
这里的想法是,您将旧列表清除,分配一个新列表以便处理可以继续,然后将旧列表的项写入数据库。锁的存在是为了防止计时器和记录计数器相互踩踏。如果没有锁,事情可能会在一段时间内正常工作,然后你会在不可预测的时间出现奇怪的崩溃。
我喜欢这种设计,因为它消除了消费者的轮询。我唯一不喜欢的是,消费者必须知道计时器(即它必须停止,然后重新启动计时器)。再多想一点,我就可以消除这个要求。但它的写作方式很好。
切换到OnMessageAsync为我解决了这个问题
_queueClient.OnMessageAsync(async receivedMessage =>
我联系了Microsoft关于BrokeredMessage在MSDN上被处理的问题,这是响应:
非常基本的规则,我不确定是否有文档说明。接收到的消息需要在回调函数的生命周期内处理。在您的情况下,消息将在异步回调完成时被处理,这就是为什么您的完整尝试在另一个线程中使用ObjectDisposedException失败的原因。
我真的不明白为进一步处理排队消息对吞吐量有什么帮助。这肯定会增加客户的负担。尝试在异步回调中处理消息,这应该是足够的性能。
在我的例子中,这意味着我不能以我想要的方式使用ServiceBus,我必须重新考虑我想要的东西是如何工作的。同性恋者。
我在开始使用Azure服务总线服务时遇到了同样的问题。
我发现方法OnMessage总是处置BrokedMessage对象。Jim Mischel提出的方法对我没有帮助(但是读起来很有趣——谢谢!)。
经过一番调查,我发现整个方法都是错误的。让我来告诉你正确的做法。- 只在OnMessage方法处理程序中使用BrokedMessage.Complete()方法。
- 如果你需要在这个方法之外处理消息,你应该使用QueueClient方法。完成(Guid lockToken)。LockToken是BrokeredMessage对象的属性。
的例子:
var messageOptions = new OnMessageOptions {
AutoComplete = false,
AutoRenewTimeout = TimeSpan.FromMinutes( 5 ),
MaxConcurrentCalls = 1
};
var buffer = new Dictionary<string, Guid>();
// get message from queue
myQueueClient.OnMessage(
m => buffer.Add(key: m.GetBody<string>(), value: m.LockToken),
messageOptions // this option says to ServiceBus to "froze" message in he queue until we process it
);
foreach(var item in buffer){
try {
Console.WriteLine($"Process item: {item.Key}");
myQueueClient.Complete(item.Value);// you can also use method CompleteBatch(...) to improve performance
}
catch{
// "unfroze" message in ServiceBus. Message would be delivered to other listener
myQueueClient.Defer(item.Value);
}
}
我的解决方案是获取消息SequenceNumber,然后延迟消息并将SequenceNumber添加到BlockingCollection中。一旦BlockingCollection拾取到一个新项目,它就可以通过SequenceNumber接收延迟的消息,并将消息标记为完成。如果由于某种原因BlockingCollection没有处理SequenceNumber,它将作为延迟保留在队列中,以便稍后在进程重新启动时可以拾取它。如果进程异常终止,而BlockingCollection中仍有项目,则可以防止丢失消息。
BlockingCollection<long> queueSequenceNumbers = new BlockingCollection<long>();
//This finds any deferred/unfinished messages on startup.
BrokeredMessage existingMessage = client.Peek();
while (existingMessage != null)
{
if (existingMessage.State == MessageState.Deferred)
{
queueSequenceNumbers.Add(existingMessage.SequenceNumber);
}
existingMessage = client.Peek();
}
//setup the message handler
Action<BrokeredMessage> processMessage = new Action<BrokeredMessage>((message) =>
{
try
{
//skip deferred messages if they are already in the queueSequenceNumbers collection.
if (message.State != MessageState.Deferred || (message.State == MessageState.Deferred && !queueSequenceNumbers.Any(x => x == message.SequenceNumber)))
{
message.Defer();
queueSequenceNumbers.Add(message.SequenceNumber);
}
}
catch (Exception ex)
{
// Indicates a problem, unlock message in queue
message.Abandon();
}
});
// Callback to handle newly received messages
client.OnMessage(processMessage, new OnMessageOptions() { AutoComplete = false, MaxConcurrentCalls = 1 });
//start the blocking loop to process messages as they are added to the collection
foreach (var queueSequenceNumber in queueSequenceNumbers.GetConsumingEnumerable())
{
var message = client.Receive(queueSequenceNumber);
//mark the message as complete so it's removed from the queue
message.Complete();
//do something with the message
}