我有一个数据源,每秒从 15-20 个线程生成 ~100 万个事件。
事件回调处理程序实现缓存策略,以记录事件中对对象的更改(保证单个对象的更新始终来自同一线程)
每 100 毫秒我想暂停/锁定事件处理程序并发布所有已修改对象的最新状态的快照。
我目前拥有的模拟实现如下所示:
private static void OnHandleManyEvents(FeedHandlerSource feedHandlerSource, MyObject myObject, ChangeFlags flags)
{
if (objectsWithChangeFlags[myObject.ID] == ChangeFlags.None)
{
UpdateStorage updateStorage = feedHandlerSourceToUpdateStorage[(int)feedHandlerSource];
lock (updateStorage.MyOjectUpdateLock)
{
objectsWithChangeFlags[myObject.ID] = objectsWithChangeFlags[myObject.ID] | flags;
updateStorage.MyUpdateObjects.Add(myObject);
}
} else
objectsWithChangeFlags[myObject.ID] = objectsWithChangeFlags[myObject.ID] | flags;
}
// runs on separate thread
private static void MyObjectPump()
{
while (true)
{
foreach (UpdateStorage updateStorage in feedHandlerSourceToUpdateStorage)
{
lock (updateStorage.MyOjectUpdateLock)
{
if (updateStorage.MyUpdateObjects.Count == 0)
continue;
foreach (MyObject myObject in updateStorage.MyUpdateObjects)
{
// do some stuff
objectsWithChangeFlags[myObject.ID] = ChangeFlags.None;
}
updateStorage.MyUpdateObjects.Clear();
}
}
Thread.Sleep(100);
}
}
此代码的问题虽然显示出良好的性能,但这是一个潜在的争用条件。
具体来说,可能是将 Pump 线程中的对象的 ChangeFlags 设置为 None,而事件回调将其设置回已更改状态而不锁定资源(在这种情况下,该对象永远不会添加到 MyObjectUpdates 列表中,并且将永远保持陈旧状态)。
另一种方法是锁定每个事件回调,这会导致过多的性能影响。
你会如何解决这个问题?
---更新---我相信我现在通过引入存储在objectsWithChangeFlags数组中的"CacheItem"解决了这个问题,该数组跟踪对象当前是否"排队"。我还测试了 ConcurrentQueue 的排队/出列,正如 Holger 在下面建议的那样,但它显示的吞吐量略低于仅使用锁(我猜是因为争用率不是很高,没有争用的锁的开销非常低)
private class CacheItem
{
public ChangeFlags Flags;
public bool IsEnqueued;
}
private static void OnHandleManyEvents(MyObject myObject, ChangeFlags flags)
{
Interlocked.Increment(ref _countTotalEvents);
Interlocked.Increment(ref _countTotalEventsForInterval);
CacheItem f = objectsWithChangeFlags[myObject.Id];
if (!f.IsEnqueued)
{
Interlocked.Increment(ref _countEnqueue);
f.Flags = f.Flags | flags;
f.IsEnqueued = true;
lock (updateStorage.MyObjectUpdateLock)
updateStorage.MyObjectUpdates.Add(myObject);
}
else
{
Interlocked.Increment(ref _countCacheHits);
f.Flags = f.Flags | flags;
}
}
private static void QuotePump()
{
while (true)
{
lock (updateStorage.MyObjectUpdateLock)
{
foreach (var obj in updateStorage.MyObjectUpdates)
{
Interlocked.Increment(ref _countDequeue);
CacheItem f = objectsWithChangeFlags[obj.Id];
f.Flags = ChangeFlags.None;
f.IsEnqueued = false;
}
updateStorage.MyObjectUpdates.Clear();
}
_countQuotePumpRuns++;
Thread.Sleep(75);
}
}
在类似的szenarios(日志记录线程)中,我使用了以下策略:
排队到并发队列的事件。如果队列不为空,则快照线程每隔一段时间查看一次。如果不是,它会读取所有想法,直到它为空,执行更改,然后拍摄快照。之后,它可以睡一会儿,或者立即再次检查是否有更多事情要处理,并且只有在一段时间不睡觉的情况下。
使用这种方法,您的事件将分批执行,并在每批之后拍摄快照。
关于缓存:
我可以想象一个(并发)字典,您可以在其中查找事件处理程序中的对象。如果未找到,则加载(或来自何处)。AFTER 事件处理它已添加(即使它已经在那里找到)。Snapshot 方法在快照之前从字典中删除它快照的所有对象。然后,事件将位于快照中,或者对象在事件发生后仍位于字典中。
这应该适用于您的前提,即对一个对象的所有更改都来自同一线程。字典将仅包含自上次快照运行以来更改的对象。
你能有两个objectsWithChangeFlags
集合,每 100 毫秒切换一次引用吗?这样,您就不必锁定任何内容,因为泵线程将在"脱机"集合上工作。