建议数据结构/同步方法



我有一个数据源,每秒从 15-20 个线程生成 ~100 万个事件。

事件

回调处理程序实现缓存策略,以记录事件中对对象的更改(保证单个对象的更新始终来自同一线程)

每 100 毫秒我想暂停/锁定事件处理程序并发布所有已修改对象的最新状态的快照。

我目前拥有的模拟实现如下所示:

private static void OnHandleManyEvents(FeedHandlerSource feedHandlerSource, MyObject myObject, ChangeFlags flags)
    {
        if (objectsWithChangeFlags[myObject.ID] == ChangeFlags.None)
        {
            UpdateStorage updateStorage = feedHandlerSourceToUpdateStorage[(int)feedHandlerSource];
            lock (updateStorage.MyOjectUpdateLock)
            {
                objectsWithChangeFlags[myObject.ID] = objectsWithChangeFlags[myObject.ID] | flags;
                updateStorage.MyUpdateObjects.Add(myObject);
            }
        } else
        objectsWithChangeFlags[myObject.ID] = objectsWithChangeFlags[myObject.ID] | flags;
    }
// runs on separate thread  
private static void MyObjectPump()
    {
            while (true)
            {
                foreach (UpdateStorage updateStorage in feedHandlerSourceToUpdateStorage)
                {
                    lock (updateStorage.MyOjectUpdateLock)
                    {
                        if (updateStorage.MyUpdateObjects.Count == 0)
                            continue;
                        foreach (MyObject myObject in updateStorage.MyUpdateObjects)
                        {
                            // do some stuff
                            objectsWithChangeFlags[myObject.ID] = ChangeFlags.None;
                        }
                        updateStorage.MyUpdateObjects.Clear();
                    }
                }
                Thread.Sleep(100);
            }
    }

此代码的问题虽然显示出良好的性能,但这是一个潜在的争用条件。

具体来说,可能是将 Pump 线程中的对象的 ChangeFlags 设置为 None,而事件回调将其设置回已更改状态而不锁定资源(在这种情况下,该对象永远不会添加到 MyObjectUpdates 列表中,并且将永远保持陈旧状态)。

另一种方法是锁定每个事件回调,这会导致过多的性能影响。

你会如何解决这个问题?

---更新---我相信我现在通过引入存储在objectsWithChangeFlags数组中的"CacheItem"解决了这个问题,该数组跟踪对象当前是否"排队"。我还测试了 ConcurrentQueue 的排队/出列,正如 Holger 在下面建议的那样,但它显示的吞吐量略低于仅使用锁(我猜是因为争用率不是很高,没有争用的锁的开销非常低)

        private class CacheItem
    {
        public ChangeFlags Flags;
        public bool IsEnqueued;
    }
    private static void OnHandleManyEvents(MyObject myObject, ChangeFlags flags)
    {
        Interlocked.Increment(ref _countTotalEvents);
        Interlocked.Increment(ref _countTotalEventsForInterval);
        CacheItem f = objectsWithChangeFlags[myObject.Id];
        if (!f.IsEnqueued)
        {
            Interlocked.Increment(ref _countEnqueue);
            f.Flags = f.Flags | flags;
            f.IsEnqueued = true;
            lock (updateStorage.MyObjectUpdateLock)
                updateStorage.MyObjectUpdates.Add(myObject);
        }
        else
        {
            Interlocked.Increment(ref _countCacheHits);
            f.Flags = f.Flags | flags;
        }
    }
    private static void QuotePump()
    {
        while (true)
        {
            lock (updateStorage.MyObjectUpdateLock)
            {
                foreach (var obj in updateStorage.MyObjectUpdates)
                {
                    Interlocked.Increment(ref _countDequeue);
                    CacheItem f = objectsWithChangeFlags[obj.Id];
                    f.Flags = ChangeFlags.None;
                    f.IsEnqueued = false;
                }
                updateStorage.MyObjectUpdates.Clear();
            }
            _countQuotePumpRuns++;
            Thread.Sleep(75);
        }
    }

在类似的szenarios(日志记录线程)中,我使用了以下策略:

排队到并发队列的事件。如果队列不为空,则快照线程每隔一段时间查看一次。如果不是,它会读取所有想法,直到它为空,执行更改,然后拍摄快照。之后,它可以睡一会儿,或者立即再次检查是否有更多事情要处理,并且只有在一段时间不睡觉的情况下。

使用这种方法,您的事件将分批执行,并在每批之后拍摄快照。

关于缓存:

我可以想象一个(并发)字典,您可以在其中查找事件处理程序中的对象。如果未找到,则加载(或来自何处)。AFTER 事件处理它已添加(即使它已经在那里找到)。Snapshot 方法在快照之前从字典中删除它快照的所有对象。然后,事件将位于快照中,或者对象在事件发生后仍位于字典中。

这应该适用于您的前提,即对一个对象的所有更改都来自同一线程。字典将仅包含自上次快照运行以来更改的对象。

你能有两个objectsWithChangeFlags集合,每 100 毫秒切换一次引用吗?这样,您就不必锁定任何内容,因为泵线程将在"脱机"集合上工作。

最新更新